diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml index 87ff85308d15..4c3a0a98194c 100644 --- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml @@ -48,6 +48,7 @@ body: - celery - cloudant - cncf-kubernetes + - common-io - common-sql - daskexecutor - databricks diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 28e0a4137293..8f5b5a4e75dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -830,6 +830,7 @@ jobs: # pip download --no-deps --dest dist apache-airflow-providers-==3.1.0 # rm -vf dist/apache_airflow_providers_openlineage*.whl + rm -rf dist/apache_airflow_providers_common_io*.whl - name: "Get all provider extras as AIRFLOW_EXTRAS env variable" # Extras might be different on S3 so rather than relying on "all" we should get the list of # packages to be installed from the current provider_dependencies.json file diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index a66d485b5fd6..a3fe3386eb3b 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -671,15 +671,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, -cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud, -deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, -druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, -ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, -mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, -papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog, +dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, +doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, +jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, +microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, +oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq, +redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp, +snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv, +webhdfs, winrm, yandex, zendesk .. END EXTRAS HERE Provider packages diff --git a/Dockerfile.ci b/Dockerfile.ci index 22a1b59a303f..48301ba2324f 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1007,7 +1007,7 @@ if [[ ${UPGRADE_BOTO=} == "true" ]]; then echo echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}" echo - pip uninstall --root-user-action ignore aiobotocore -y || true + pip uninstall --root-user-action ignore aiobotocore s3fs -y || true pip install --root-user-action ignore --upgrade boto3 botocore pip check fi @@ -1468,7 +1468,7 @@ RUN echo "Airflow version: ${AIRFLOW_VERSION}" # Without grpcio-status limit, pip gets into very long backtracking # We should attempt to remove it in the future # -ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0" +ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0 aiobotocore>=2.7.0" ARG UPGRADE_TO_NEWER_DEPENDENCIES="false" ARG VERSION_SUFFIX_FOR_PYPI="" diff --git a/INSTALL b/INSTALL index 027d4a621c00..da9846ac95e9 100644 --- a/INSTALL +++ b/INSTALL @@ -98,15 +98,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam, apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs, apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups, -cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud, -deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker, -druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google, -google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes, -ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql, -mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas, -papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba, -segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd, -tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk +cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog, +dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, +doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, +github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, +jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, +microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, +oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq, +redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp, +snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv, +webhdfs, winrm, yandex, zendesk # END EXTRAS HERE # For installing Airflow in development environments - see CONTRIBUTING.rst diff --git a/airflow/example_dags/tutorial_objectstorage.py b/airflow/example_dags/tutorial_objectstorage.py new file mode 100644 index 000000000000..5394238c7ad2 --- /dev/null +++ b/airflow/example_dags/tutorial_objectstorage.py @@ -0,0 +1,135 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +# [START tutorial] +# [START import_module] +import pendulum +import requests + +from airflow.decorators import dag, task +from airflow.io.store.path import ObjectStoragePath + +# [END import_module] + +API = "https://opendata.fmi.fi/timeseries" + +aq_fields = { + "fmisid": "int32", + "time": "datetime64[ns]", + "AQINDEX_PT1H_avg": "float64", + "PM10_PT1H_avg": "float64", + "PM25_PT1H_avg": "float64", + "O3_PT1H_avg": "float64", + "CO_PT1H_avg": "float64", + "SO2_PT1H_avg": "float64", + "NO2_PT1H_avg": "float64", + "TRSC_PT1H_avg": "float64", +} + +# [START create_object_storage_path] +base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default") +# [END create_object_storage_path] + + +# [START instantiate_dag] +@dag( + schedule=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], +) +def tutorial_objectstorage(): + """ + ### Object Storage Tutorial Documentation + This is a tutorial DAG to showcase the usage of the Object Storage API. + Documentation that goes along with the Airflow Object Storage tutorial is + located + [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html) + """ + # [END instantiate_dag] + import duckdb + import pandas as pd + + # [START get_air_quality_data] + @task + def get_air_quality_data(**kwargs) -> ObjectStoragePath: + """ + #### Get Air Quality Data + This task gets air quality data from the Finnish Meteorological Institute's + open data API. The data is saved as parquet. + """ + execution_date = kwargs["logical_date"] + start_time = kwargs["data_interval_start"] + + params = { + "format": "json", + "precision": "double", + "groupareas": "0", + "producer": "airquality_urban", + "area": "Uusimaa", + "param": ",".join(aq_fields.keys()), + "starttime": start_time.isoformat(timespec="seconds"), + "endtime": execution_date.isoformat(timespec="seconds"), + "tz": "UTC", + } + + response = requests.get(API, params=params) + response.raise_for_status() + + # ensure the bucket exists + base.mkdir(exists_ok=True) + + formatted_date = execution_date.format("YYYYMMDD") + path = base / f"air_quality_{formatted_date}.parquet" + + df = pd.DataFrame(response.json()).astype(aq_fields) + with path.open("wb") as file: + df.to_parquet(file) + + return path + + # [END get_air_quality_data] + + # [START analyze] + @task + def analyze(path: ObjectStoragePath, **kwargs): + """ + #### Analyze + This task analyzes the air quality data, prints the results + """ + conn = duckdb.connect(database=":memory:") + conn.register_filesystem(path.fs) + conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") + + df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf() + + print(df2.head()) + + # [END analyze] + + # [START main_flow] + obj_path = get_air_quality_data() + analyze(obj_path) + # [END main_flow] + + +# [START dag_invocation] +tutorial_objectstorage() +# [END dag_invocation] +# [END tutorial] diff --git a/airflow/io/__init__.py b/airflow/io/__init__.py new file mode 100644 index 000000000000..9f456eaddcd0 --- /dev/null +++ b/airflow/io/__init__.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import ( + TYPE_CHECKING, + Callable, +) + +from fsspec.implementations.local import LocalFileSystem + +from airflow.compat.functools import cache +from airflow.providers_manager import ProvidersManager +from airflow.stats import Stats +from airflow.utils.module_loading import import_string + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +log = logging.getLogger(__name__) + + +def _file(_: str | None) -> LocalFileSystem: + return LocalFileSystem() + + +# builtin supported filesystems +_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = { + "file": _file, +} + + +@cache +def _register_filesystems() -> dict[str, Callable[[str | None], AbstractFileSystem]]: + scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy() + with Stats.timer("airflow.io.load_filesystems") as timer: + manager = ProvidersManager() + for fs_module_name in manager.filesystem_module_names: + fs_module = import_string(fs_module_name) + for scheme in getattr(fs_module, "schemes", []): + if scheme in scheme_to_fs: + log.warning("Overriding scheme %s for %s", scheme, fs_module_name) + + method = getattr(fs_module, "get_fs", None) + if method is None: + raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method") + scheme_to_fs[scheme] = method + + log.debug("loading filesystems from providers took %.3f seconds", timer.duration) + return scheme_to_fs + + +def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem: + """ + Get a filesystem by scheme. + + :param scheme: the scheme to get the filesystem for + :return: the filesystem method + :param conn_id: the airflow connection id to use + """ + filesystems = _register_filesystems() + try: + return filesystems[scheme](conn_id) + except KeyError: + raise ValueError(f"No filesystem registered for scheme {scheme}") + + +def has_fs(scheme: str) -> bool: + """ + Check if a filesystem is available for a scheme. + + :param scheme: the scheme to check + :return: True if a filesystem is available for the scheme + """ + return scheme in _register_filesystems() diff --git a/airflow/io/store/__init__.py b/airflow/io/store/__init__.py new file mode 100644 index 000000000000..4385b2e988d5 --- /dev/null +++ b/airflow/io/store/__init__.py @@ -0,0 +1,141 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar + +from airflow.io import get_fs, has_fs +from airflow.utils.module_loading import qualname + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + + +class ObjectStore: + """Manages a filesystem or object storage.""" + + __version__: ClassVar[int] = 1 + + method: str + conn_id: str | None + protocol: str + + _fs: AbstractFileSystem = None + + def __init__(self, protocol: str, conn_id: str | None, fs: AbstractFileSystem | None = None): + self.conn_id = conn_id + self.protocol = protocol + self._fs = fs + + def __str__(self): + return f"{self.protocol}-{self.conn_id}" if self.conn_id else self.protocol + + @property + def fs(self) -> AbstractFileSystem: + self._connect() + return self._fs + + @property + def fsid(self) -> str: + """ + Get the filesystem id for this store in order to be able to compare across instances. + + The underlying `fsid` is returned from the filesystem if available, otherwise it is generated + from the protocol and connection ID. + + :return: deterministic the filesystem ID + """ + self._connect() + try: + return self._fs.fsid + except NotImplementedError: + return f"{self.fs.protocol}-{self.conn_id or 'env'}" + + def serialize(self): + return { + "protocol": self.protocol, + "conn_id": self.conn_id, + "filesystem": qualname(self._fs) if self._fs else None, + } + + @classmethod + def deserialize(cls, data: dict[str, str], version: int): + if version > cls.__version__: + raise ValueError(f"Cannot deserialize version {version} for {cls.__name__}") + + protocol = data["protocol"] + conn_id = data["conn_id"] + + alias = f"{protocol}-{conn_id}" if conn_id else protocol + + if store := _STORE_CACHE.get(alias, None): + return store + + if not has_fs(protocol): + if "filesystem" in data and data["filesystem"]: + raise ValueError( + f"No attached filesystem found for {data['filesystem']} with " + f"protocol {data['protocol']}. Please use attach() for this protocol and filesystem." + ) + + return attach(protocol=protocol, conn_id=conn_id) + + def _connect(self): + if self._fs is None: + self._fs = get_fs(self.protocol, self.conn_id) + + def __eq__(self, other): + return isinstance(other, type(self)) and other.conn_id == self.conn_id and other._fs == self._fs + + +_STORE_CACHE: dict[str, ObjectStore] = {} + + +def attach( + protocol: str | None = None, + conn_id: str | None = None, + alias: str | None = None, + encryption_type: str | None = "", + fs: AbstractFileSystem | None = None, +) -> ObjectStore: + """ + Attach a filesystem or object storage. + + :param alias: the alias to be used to refer to the store, autogenerated if omitted + :param protocol: the scheme that is used without :// + :param conn_id: the connection to use to connect to the filesystem + :param encryption_type: the encryption type to use to connect to the filesystem + :param fs: the filesystem type to use to connect to the filesystem + """ + if alias: + if store := _STORE_CACHE.get(alias, None): + return store + elif not protocol: + raise ValueError(f"No registered store with alias: {alias}") + + if not protocol: + raise ValueError("No protocol specified and no alias provided") + + if not alias: + alias = f"{protocol}-{conn_id}" if conn_id else protocol + if store := _STORE_CACHE.get(alias, None): + return store + + store = ObjectStore(protocol=protocol, conn_id=conn_id, fs=fs) + _STORE_CACHE[alias] = store + + return store diff --git a/airflow/io/store/path.py b/airflow/io/store/path.py new file mode 100644 index 000000000000..7ee097185514 --- /dev/null +++ b/airflow/io/store/path.py @@ -0,0 +1,764 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import os +import shutil +import typing +from io import UnsupportedOperation +from stat import S_ISLNK + +from fsspec.utils import stringify_path + +from airflow.io.store import ObjectStore, attach +from airflow.io.store.stat import stat_result + +if typing.TYPE_CHECKING: + from fsspec import AbstractFileSystem + + +def _rewrite_info(info: dict, store: ObjectStore) -> dict: + info["name"] = ObjectStoragePath(info["name"], store=store) + return info + + +class ObjectStoragePath(os.PathLike): + """A path-like object for object storage.""" + + __version__: typing.ClassVar[int] = 1 + + sep: typing.ClassVar[str] = "/" + root_marker: typing.ClassVar[str] = "/" + + _store: ObjectStore | None + _bucket: str + _key: str + _conn_id: str | None + _protocol: str + _hash: int | None + + __slots__ = ( + "_store", + "_bucket", + "_key", + "_conn_id", + "_protocol", + "_hash", + ) + + def __init__( + self, path: str | ObjectStoragePath, conn_id: str | None = None, store: ObjectStore | None = None + ): + self._conn_id = conn_id + self._store = store + + self._hash = None + + if isinstance(path, ObjectStoragePath): + self._protocol = path._protocol + self._bucket = path._bucket + self._key = path._key + self._store = path._store + else: + self._protocol, self._bucket, self._key = self.split_path(path) + + if store: + self._conn_id = store.conn_id + self._protocol = self._protocol if self._protocol else store.protocol + elif self._protocol and not self._store: + self._store = attach(self._protocol, conn_id) + + @classmethod + def split_path(cls, path) -> tuple[str, str, str]: + protocol = "" + key = "" + + path = stringify_path(path) + + i = path.find("://") + if i > 0: + protocol = path[:i] + path = path[i + 3 :] + + if cls.sep not in path: + bucket = path + else: + bucket, key = path.split(cls.sep, 1) + + # we don't care about versions etc + return protocol, bucket, key + + def __fspath__(self): + return self.__str__() + + def __repr__(self): + return f"<{type(self).__name__}('{self}')>" + + def __str__(self): + path = ( + f"{self._protocol}://{self._bucket}/{self._key}" + if self._protocol + else f"{self._bucket}/{self._key}" + ) + + return path + + def __lt__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket < other._bucket + + def __le__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket <= other._bucket + + def __eq__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket == other._bucket + + def __ne__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket != other._bucket + + def __gt__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket > other._bucket + + def __ge__(self, other): + if not isinstance(other, ObjectStoragePath): + return NotImplemented + + return self._bucket >= other._bucket + + def __hash__(self): + if not self._hash: + self._hash = hash(self._bucket) + + return self._hash + + def __truediv__(self, other) -> ObjectStoragePath: + o_protocol, o_bucket, o_key = self.split_path(other) + if not isinstance(other, str) and o_bucket and self._bucket != o_bucket: + raise ValueError("Cannot combine paths from different buckets / containers") + + if o_protocol and self._protocol != o_protocol: + raise ValueError("Cannot combine paths from different protocols") + + path = f"{stringify_path(self).rstrip(self.sep)}/{stringify_path(other).lstrip(self.sep)}" + return ObjectStoragePath(path, conn_id=self._conn_id) + + def _unsupported(self, method_name): + msg = f"{type(self).__name__}.{method_name}() is unsupported" + raise UnsupportedOperation(msg) + + def samestore(self, other): + return isinstance(other, ObjectStoragePath) and self._store == other._store + + @property + def container(self) -> str: + return self._bucket + + @property + def bucket(self) -> str: + return self._bucket + + @property + def key(self) -> str: + return self._key + + @property + def store(self) -> ObjectStore: + if not self._store: + raise ValueError("Cannot do operations. No store attached.") + + return self._store + + @property + def protocol(self) -> str: + return self._protocol + + @property + def fs(self) -> AbstractFileSystem: + return self.store.fs + + @property + def parent(self) -> ObjectStoragePath: + return ObjectStoragePath(self.store.fs._parent(str(self)), store=self.store) + + def stat(self, *, follow_symlinks=True): + """Return the result of the `stat()` call.""" # noqa: D402 + stat = self.store.fs.stat(self) + stat.update( + { + "protocol": self.store.protocol, + "conn_id": self.store.conn_id, + } + ) + return stat_result(stat) + + def lstat(self): + """Like stat() except that it doesn't follow symlinks.""" + return self.stat(follow_symlinks=False) + + def exists(self): + """Whether this path exists.""" + return self.store.fs.exists(self) + + def is_dir(self): + """Return True if this path is directory like.""" + return self.store.fs.isdir(self) + + def is_file(self): + """Return True if this path is a regular file.""" + return self.store.fs.isfile(self) + + def is_mount(self): + return self._unsupported("is_mount") + + def is_symlink(self): + """Whether this path is a symbolic link.""" + try: + return S_ISLNK(self.lstat().st_mode) + except OSError: + # Path doesn't exist + return False + except ValueError: + # Non-encodable path + return False + + def is_block_device(self): + self._unsupported("is_block_device") + + def is_char_device(self): + self._unsupported("is_char_device") + + def is_fifo(self): + self._unsupported("is_fifo") + + def is_socket(self): + self._unsupported("is_socket") + + def samefile(self, other_path): + """Return whether other_path is the same or not as this file.""" + if other_path != ObjectStoragePath: + return False + + st = self.stat() + other_st = other_path.stat() + + return ( + st["protocol"] == other_st["protocol"] + and st["conn_id"] == other_st["conn_id"] + and st["ino"] == other_st["ino"] + ) + + def checksum(self): + """Return the checksum of the file at this path.""" + return self.store.fs.checksum(self) + + def open( + self, + mode="rb", + block_size=None, + cache_options=None, + compression=None, + encoding=None, + errors=None, + newline=None, + **kwargs, + ): + """ + Return a file-like object from the filesystem. + + The resultant instance must function correctly in a context 'with' block. + + :param mode: str like 'rb', 'w' + See builtin 'open()'. + :param block_size: int + Some indication of buffering - this is a value in bytes. + :param cache_options: dict, optional + Extra arguments to pass through to the cache. + :param compression: string or None + If given, open file using a compression codec. Can either be a compression + name (a key in 'fsspec.compression.compr') or 'infer' to guess the + compression from the filename suffix. + :param encoding: passed on to TextIOWrapper for text mode + :param errors: passed on to TextIOWrapper for text mode + :param newline: passed on to TextIOWrapper for text mode + + kwargs: Additional keyword arguments to be passed on. + """ + return self.store.fs.open( + str(self), + mode=mode, + block_size=block_size, + cache_options=cache_options, + compression=compression, + encoding=encoding, + errors=errors, + newline=newline, + **kwargs, + ) + + def read_bytes(self, start: int | None = None, end: int | None = None): + """Open the file in bytes mode, read it, and close the file.""" + self.store.fs.read_bytes(str(self), start=start, end=end) + + def read_text(self, encoding=None, errors=None, newline=None, **kwargs): + """Open the file in text mode, read it, and close the file.""" + return self.store.fs.read_text(str(self), encoding=encoding, errors=errors, newline=newline, **kwargs) + + def write_bytes(self, data, **kwargs): + """Open the file in bytes mode, write to it, and close the file.""" + self.store.fs.pipe_file(self, value=data, **kwargs) + + def write_text(self, data, encoding=None, errors=None, newline=None, **kwargs): + """Open the file in text mode, write to it, and close the file.""" + return self.store.fs.write_text( + str(self), value=data, encoding=encoding, errors=errors, newline=newline, **kwargs + ) + + def iterdir(self): + """Iterate over the files in this directory.""" + return self._unsupported("iterdir") + + def _scandir(self): + # Emulate os.scandir(), which returns an object that can be used as a + # context manager. + return contextlib.nullcontext(self.iterdir()) + + def glob(self, pattern: str, maxdepth: int | None = None, **kwargs): + """ + Find files by glob-matching. + + If the path ends with '/', only folders are returned. + + We support ``"**"``, + ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. + + The `maxdepth` option is applied on the first `**` found in the path. + + Search path names that contain embedded characters special to this + implementation of glob may not produce expected results; + e.g., 'foo/bar/*starredfilename*'. + + :param pattern: str + The glob pattern to match against. + :param maxdepth: int or None + The maximum depth to search. If None, there is no depth limit. + + kwargs: Additional keyword arguments to be passed on. + """ + path = os.path.join(self._bucket, pattern) + + detail = kwargs.get("detail", False) + items = self.store.fs.glob(path, maxdepth=maxdepth, **kwargs) + if detail: + t = { + ObjectStoragePath(k, store=self.store): _rewrite_info(v, self.store) for k, v in items.items() + } + return t + else: + return [ObjectStoragePath(c, store=self.store) for c in items] + + def rglob(self, maxdepth: int | None = None, **kwargs): + self._unsupported("rglob") + + def walk(self, maxdepth: int | None = None, topdown: bool = True, on_error: str = "omit", **kwargs): + """ + Return all files belows path. + + List all files, recursing into subdirectories; output is iterator-style, + like ``os.walk()``. For a simple list of files, ``find()`` is available. + + When topdown is True, the caller can modify the dirnames list in-place (perhaps + using del or slice assignment), and walk() will + only recurse into the subdirectories whose names remain in dirnames; + this can be used to prune the search, impose a specific order of visiting, + or even to inform walk() about directories the caller creates or renames before + it resumes walk() again. + Modifying dirnames when topdown is False has no effect. (see os.walk) + + Note that the "files" outputted will include anything that is not + a directory, such as links. + + :param maxdepth: int or None + Maximum recursion depth. None means limitless, but not recommended + on link-based file-systems. + :param topdown: bool (True) + Whether to walk the directory tree from the top downwards or from + the bottom upwards. + :param on_error: "omit", "raise", a collable + if omit (default), path with exception will simply be empty; + If raise, an underlying exception will be raised; + if callable, it will be called with a single OSError instance as argument + kwargs: Additional keyword arguments to be passed on. + """ + detail = kwargs.get("detail", False) + items = self.store.fs.walk(str(self), maxdepth=maxdepth, topdown=topdown, on_error=on_error, **kwargs) + if not detail: + for path, dirs, files in items: + yield ObjectStoragePath(path, store=self.store), dirs, files + else: + for path, dirs, files in items: + yield ( + ObjectStoragePath(path, store=self.store), + {k: _rewrite_info(v, self.store) for k, v in dirs.items()}, + {k: _rewrite_info(v, self.store) for k, v in files.items()}, + ) + + def ls(self, detail: bool = True, **kwargs) -> list[ObjectStoragePath] | list[dict]: + """ + List files at path. + + :param detail: bool + If True, return a dict containing details about each entry, otherwise + return a list of paths. + + kwargs: Additional keyword arguments to be passed on. + """ + items = self.store.fs.ls(str(self), detail=detail, **kwargs) + + if detail: + return [_rewrite_info(c, self.store) for c in items] + else: + return [ObjectStoragePath(c, store=self.store) for c in items] + + def absolute(self): + """Return an absolute version of this path. Resolving any aliases.""" + path = f"{self.store.protocol}://{self._key}" + return path + + def touch(self, truncate: bool = True): + """Create an empty file, or update the timestamp. + + :param truncate: bool (True) + If True, always set the file size to 0; if False, update the timestamp and + leave the file unchanged, if the backend allows this. + """ + return self.store.fs.touch(str(self), truncate=truncate) + + def mkdir(self, create_parents: bool = True, exists_ok: bool = False, **kwargs): + """ + Create a directory entry at the specified path or within a bucket/container. + + For systems that don't have true directories, it may create a directory entry + for this instance only and not affect the real filesystem. + + :param create_parents: bool + if True, this is equivalent to 'makedirs'. + :param exists_ok: bool + if True, do not raise an error if the target directory already exists. + + kwargs: Additional keyword arguments, which may include permissions, etc. + """ + if not exists_ok and self.exists(): + raise FileExistsError(f"Target {self} exists") + + try: + self.store.fs.mkdir(str(self), create_parents=create_parents, **kwargs) + except FileExistsError: + pass + + def unlink(self, recursive: bool = False, maxdepth: int | None = None): + """ + Remove this file or link. + + If the path is a directory, use rmdir() instead. + """ + self.store.fs.rm(str(self), recursive=recursive, maxdepth=maxdepth) + + def rm(self, recursive: bool = False, maxdepth: int | None = None): + """ + Remove this file or link. + + Alias of unlink + """ + self.unlink(recursive=recursive, maxdepth=maxdepth) + + def rmdir(self): + """Remove this directory. The directory must be empty.""" + return self.store.fs.rmdir(str(self)) + + def rename(self, target: str | ObjectStoragePath, overwrite=False): + """ + Rename this path to the target path. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + if isinstance(target, str): + target = ObjectStoragePath(target, store=self.store) + + if not self.samestore(target): + raise ValueError("You can only rename within the same store") + + if not overwrite: + if self.store.fs.exists(target): + raise FileExistsError(f"Target {target} exists") + + return ObjectStoragePath(self.store.fs.mv(str(self), target), store=self._store) + + def replace(self, target: str | ObjectStoragePath): + """ + Rename this path to the target path, overwriting if that path exists. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + return self.rename(target, overwrite=True) + + # EXTENDED OPERATIONS + + def ukey(self): + """Hash of file properties, to tell if it has changed.""" + return self.store.fs.ukey(str(self)) + + def read_block(self, offset: int, length: int, delimiter=None): + r"""Read a block of bytes. + + Starting at ``offset`` of the file, read ``length`` bytes. If + ``delimiter`` is set then we ensure that the read starts and stops at + delimiter boundaries that follow the locations ``offset`` and ``offset + + length``. If ``offset`` is zero then we start at zero. The + bytestring returned WILL include the end delimiter string. + + If offset+length is beyond the eof, reads to eof. + + :param offset: int + Byte offset to start read + :param length: int + Number of bytes to read. If None, read to the end. + :param delimiter: bytes (optional) + Ensure reading starts and stops at delimiter bytestring + + Examples + -------- + >>> read_block(0, 13) + b'Alice, 100\\nBo' + >>> read_block(0, 13, delimiter=b'\\n') + b'Alice, 100\\nBob, 200\\n' + + Use ``length=None`` to read to the end of the file. + >>> read_block(0, None, delimiter=b'\\n') + b'Alice, 100\\nBob, 200\\nCharlie, 300' + + See Also + -------- + :func:`fsspec.utils.read_block` + """ + return self.store.fs.read_block(str(self), offset, length, delimiter=delimiter) + + def sign(self, expiration: int = 100, **kwargs): + """Create a signed URL representing the given path. + + Some implementations allow temporary URLs to be generated, as a + way of delegating credentials. + + :param path: str + The path on the filesystem + :param expiration: int + Number of seconds to enable the URL for (if supported) + + :returns URL: str + The signed URL + + :raises NotImplementedError: if the method is not implemented for a store + """ + return self.store.fs.sign(str(self), expiration=expiration, **kwargs) + + def size(self): + """Size in bytes of the file at this path.""" + return self.store.fs.size(self) + + def du(self, total: bool = True, maxdepth: int | None = None, withdirs: bool = False, **kwargs): + """Space used by files and optionally directories within a path. + + Directory size does not include the size of its contents. + + :param total: bool + Whether to sum all the file sizes + :param maxdepth: int or None + Maximum number of directory levels to descend, None for unlimited. + :param withdirs: bool + Whether to include directory paths in the output. + + kwargs: Additional keyword arguments to be passed on. + + :returns: Dict of {path: size} if total=False, or int otherwise, where numbers + refer to bytes used. + """ + return self.store.fs.du(str(self), total=total, maxdepth=maxdepth, withdirs=withdirs, **kwargs) + + def find( + self, path: str, maxdepth: int | None = None, withdirs: bool = False, detail: bool = False, **kwargs + ): + """List all files below the specified path. + + Like posix ``find`` command without conditions. + + :param path: str + Path pattern to search. + :param maxdepth: int or None + If not None, the maximum number of levels to descend. + :param withdirs: bool + Whether to include directory paths in the output. This is True + when used by glob, but users usually only want files. + :param detail: bool + Whether to include file info. + + kwargs: Additional keyword arguments to be passed to ``ls``. + """ + path = self.sep.join([str(self), path.lstrip("/")]) + items = self.store.fs.find(path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs) + + if detail: + return { + ObjectStoragePath(k, store=self.store): _rewrite_info(v, self.store) for k, v in items.items() + } + else: + return [ObjectStoragePath(c, store=self.store) for c in items] + + def _cp_file(self, dst: str | ObjectStoragePath, **kwargs): + """Copy a single file from this path to another location by streaming the data.""" + if isinstance(dst, str): + dst = ObjectStoragePath(dst) + + # create the directory or bucket if required + if dst.key.endswith(self.sep) or not dst.key: + dst.mkdir(exists_ok=True, create_parents=True) + dst = dst / self.key + elif dst.is_dir(): + dst = dst / self.key + + # streaming copy + with self.open("rb") as f1, dst.open("wb") as f2: + # make use of system dependent buffer size + shutil.copyfileobj(f1, f2, **kwargs) + + def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: + """Copy file(s) from this path to another location. + + For remote to remote copies, the key used for the destination will be the same as the source. + So that s3://src_bucket/foo/bar will be copied to gcs://dst_bucket/foo/bar and not + gcs://dst_bucket/bar. + + :param dst: Destination path + :param recursive: If True, copy directories recursively. + + kwargs: Additional keyword arguments to be passed to the underlying implementation. + """ + if isinstance(dst, str): + dst = ObjectStoragePath(dst) + + # same -> same + if self.samestore(dst): + self.store.fs.copy(str(self), dst, recursive=recursive, **kwargs) + return + + # use optimized path for local -> remote or remote -> local + if self.store.protocol == "file": + lpath = self.store.fs._strip_protocol(str(self)) + dst.store.fs.put(lpath, str(dst), recursive=recursive, **kwargs) + return + + if dst.store.protocol == "file": + rpath = dst.store.fs._strip_protocol(str(dst)) + self.store.fs.get(str(self), rpath, recursive=recursive, **kwargs) + return + + if not self.exists(): + raise FileNotFoundError(f"{self} does not exist") + + # remote dir -> remote dir + if self.is_dir(): + if dst.is_file(): + raise ValueError("Cannot copy directory to a file.") + + dst.mkdir(exists_ok=True, create_parents=True) + + out = self.store.fs.expand_path(str(self), recursive=True, **kwargs) + source_stripped = self.store.fs._strip_protocol(str(self)) + + for path in out: + # this check prevents one extra call to is_dir() as + # glob returns self as well + if path == source_stripped: + continue + + src_obj = ObjectStoragePath(path, store=self.store) + + # skip directories, empty directories will not be created + if src_obj.is_dir(): + continue + + src_obj._cp_file(dst) + + return + + # remote file -> remote dir + if self.is_file(): + self._cp_file(dst, **kwargs) + return + + def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs) -> None: + """Move file(s) from this path to another location. + + :param path: Destination path + :param recursive: bool + If True, move directories recursively. + + kwargs: Additional keyword arguments to be passed to the underlying implementation. + """ + if isinstance(path, str): + path = ObjectStoragePath(path) + + if self.samestore(path): + return self.store.fs.move(str(self), str(path), recursive=recursive, **kwargs) + + # non-local copy + self.copy(path, recursive=recursive, **kwargs) + self.unlink(recursive=recursive) + + def serialize(self) -> dict[str, str | ObjectStore]: + return { + "path": str(self), + "store": self.store, + } + + @classmethod + def deserialize(cls, data: dict, version: int) -> ObjectStoragePath: + if version > cls.__version__: + raise ValueError(f"Cannot deserialize version {version} with version {cls.__version__}.") + + return ObjectStoragePath(**data) diff --git a/airflow/io/store/stat.py b/airflow/io/store/stat.py new file mode 100644 index 000000000000..31be488fa46a --- /dev/null +++ b/airflow/io/store/stat.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from stat import S_IFDIR, S_IFLNK, S_IFREG + + +class stat_result(dict): + """ + stat_result: Result from stat, fstat, or lstat. + + This object provides a subset of os.stat_result attributes, + for results returned from ObjectStoragePath.stat() + + It provides st_dev, st_ino, st_mode, st_nlink, st_uid, st_gid, + st_size and st_mtime if they are available from the underlying + storage. Extended attributes maybe accessed via dict access. + + See os.stat for more information. + """ + + def __init__(self, *arg, **kwargs): + super().__init__(**kwargs) + self.update(*arg, **kwargs) + + st_dev = property(lambda self: 0) + """device""" + + st_size = property(lambda self: self._info.get("size", 0)) + """total size, in bytes""" + + st_gid = property(lambda self: self._info.get("gid", 0)) + """group ID of owner""" + + st_uid = property(lambda self: self._info.get("uid", 0)) + """user ID of owner""" + + st_ino = property(lambda self: self._info.get("ino", 0)) + """inode""" + + st_nlink = property(lambda self: self._info.get("nlink", 0)) + """number of hard links""" + + @property + def st_mtime(self): + """Time of most recent content modification.""" + if "mtime" in self: + return self.get("mtime") + + if "LastModified" in self: + return self.get("LastModified").timestamp() + + # per posix.py + return 0 + + @property + def st_mode(self): + """Protection bits.""" + if "mode" in self: + return self.get("mode") + + # per posix.py + mode = 0o0 + if self.get("type", "") == "file": + mode = S_IFREG + + if self.get("type", "") == "directory": + mode = S_IFDIR + + if self.get("isLink", False): + mode = S_IFLNK + + return mode diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json index 509e5dfce62c..567a44db7396 100644 --- a/airflow/provider.yaml.schema.json +++ b/airflow/provider.yaml.schema.json @@ -169,6 +169,13 @@ ] } }, + "filesystems": { + "type": "array", + "description": "Filesystem module names", + "items": { + "type": "string" + } + }, "transfers": { "type": "array", "items": { diff --git a/airflow/provider_info.schema.json b/airflow/provider_info.schema.json index b97bd44fe4da..e9a2571f0652 100644 --- a/airflow/provider_info.schema.json +++ b/airflow/provider_info.schema.json @@ -25,6 +25,13 @@ "deprecatedVersion": "2.2.0" } }, + "filesystems": { + "type": "array", + "description": "Filesystem module names", + "items": { + "type": "string" + } + }, "transfers": { "type": "array", "items": { diff --git a/airflow/providers/amazon/aws/fs/__init__.py b/airflow/providers/amazon/aws/fs/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/amazon/aws/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/amazon/aws/fs/s3.py b/airflow/providers/amazon/aws/fs/s3.py new file mode 100644 index 000000000000..afe13be1b43b --- /dev/null +++ b/airflow/providers/amazon/aws/fs/s3.py @@ -0,0 +1,124 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from functools import partial +from typing import TYPE_CHECKING, Any, Callable, Dict + +import requests +from botocore import UNSIGNED +from requests import HTTPError + +from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook + +if TYPE_CHECKING: + from botocore.awsrequest import AWSRequest + from fsspec import AbstractFileSystem + + +Properties = Dict[str, str] + +S3_PROXY_URI = "proxy-uri" + +log = logging.getLogger(__name__) + +schemes = ["s3", "s3a", "s3n"] + + +class SignError(Exception): + """Raises when unable to sign a S3 request.""" + + +def get_fs(conn_id: str | None) -> AbstractFileSystem: + try: + from s3fs import S3FileSystem + except ImportError: + raise ImportError( + "Airflow FS S3 protocol requires the s3fs library, but it is not installed as it requires" + "aiobotocore. Please install the s3 protocol support library by running: " + "pip install apache-airflow[s3]" + ) + + aws: AwsGenericHook = AwsGenericHook(aws_conn_id=conn_id, client_type="s3") + session = aws.get_session(deferrable=True) + endpoint_url = aws.conn_config.get_service_endpoint_url(service_name="s3") + + config_kwargs: dict[str, Any] = aws.conn_config.extra_config.get("config_kwargs", {}) + register_events: dict[str, Callable[[Properties], None]] = {} + + s3_service_config = aws.service_config + if signer := s3_service_config.get("signer", None): + log.info("Loading signer %s", signer) + if singer_func := SIGNERS.get(signer): + uri = s3_service_config.get("signer_uri", None) + token = s3_service_config.get("signer_token", None) + if not uri or not token: + raise ValueError(f"Signer {signer} requires uri and token") + + properties: Properties = { + "uri": uri, + "token": uri, + } + singer_func_with_properties = partial(singer_func, properties) + register_events["before-sign.s3"] = singer_func_with_properties + + # Disable the AWS Signer + config_kwargs["signature_version"] = UNSIGNED + else: + raise ValueError(f"Signer not available: {signer}") + + if proxy_uri := s3_service_config.get(S3_PROXY_URI, None): + config_kwargs["proxies"] = {"http": proxy_uri, "https": proxy_uri} + + fs = S3FileSystem(session=session, config_kwargs=config_kwargs, endpoint_url=endpoint_url) + + for event_name, event_function in register_events.items(): + fs.s3.meta.events.register_last(event_name, event_function, unique_id=1925) + + return fs + + +def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> AWSRequest: + if "token" not in properties: + raise SignError("Signer set, but token is not available") + + signer_url = properties["uri"].rstrip("/") + signer_headers = {"Authorization": f"Bearer {properties['token']}"} + signer_body = { + "method": request.method, + "region": request.context["client_region"], + "uri": request.url, + "headers": {key: [val] for key, val in request.headers.items()}, + } + + response = requests.post(f"{signer_url}/v1/aws/s3/sign", headers=signer_headers, json=signer_body) + try: + response.raise_for_status() + response_json = response.json() + except HTTPError as e: + raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e + + for key, value in response_json["headers"].items(): + request.headers.add_header(key, ", ".join(value)) + + request.url = response_json["uri"] + + return request + + +SIGNERS: dict[str, Callable[[Properties, AWSRequest], AWSRequest]] = {"S3V4RestSigner": s3v4_rest_signer} diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3b13925d5bfd..1661553c8597 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -444,6 +444,9 @@ sensors: python-modules: - airflow.providers.amazon.aws.sensors.quicksight +filesystems: + - airflow.providers.amazon.aws.fs.s3 + hooks: - integration-name: Amazon Athena python-modules: @@ -736,6 +739,9 @@ additional-extras: - name: cncf.kubernetes dependencies: - apache-airflow-providers-cncf-kubernetes>=7.2.0 + - name: s3fs + dependencies: + - s3fs>=2023.9.2 config: aws: diff --git a/airflow/providers/common/io/CHANGELOG.rst b/airflow/providers/common/io/CHANGELOG.rst new file mode 100644 index 000000000000..651ecdcd6949 --- /dev/null +++ b/airflow/providers/common/io/CHANGELOG.rst @@ -0,0 +1,30 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. NOTE TO CONTRIBUTORS: + Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes + and you want to add an explanation to the users on how they are supposed to deal with them. + The changelog is updated and maintained semi-automatically by release manager. + +``apache-airflow-providers-common-io`` + +Changelog +--------- + +1.0.0 +----- +Initial version of the provider. diff --git a/airflow/providers/common/io/__init__.py b/airflow/providers/common/io/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/common/io/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/common/io/operators/__init__.py b/airflow/providers/common/io/operators/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/common/io/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/common/io/operators/file_transfer.py b/airflow/providers/common/io/operators/file_transfer.py new file mode 100644 index 000000000000..bb6973535277 --- /dev/null +++ b/airflow/providers/common/io/operators/file_transfer.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Sequence + +from airflow.io.store.path import ObjectStoragePath +from airflow.models import BaseOperator + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class FileTransferOperator(BaseOperator): + """ + Copies a file from a source to a destination. + + This streams the file from the source to the destination if required + , so it does not need to fit into memory. + + :param src: The source file path or ObjectStoragePath object. + :param dst: The destination file path or ObjectStoragePath object. + :param source_conn_id: The optional source connection id. + :param dest_conn_id: The optional destination connection id. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:FileTransferOperator` + """ + + template_fields: Sequence[str] = ("src", "dst") + + def __init__( + self, + *, + src: str | ObjectStoragePath, + dst: str | ObjectStoragePath, + source_conn_id: str | None = None, + dest_conn_id: str | None = None, + overwrite: bool = False, + **kwargs, + ) -> None: + super().__init__(**kwargs) + + self.src = src + self.dst = dst + self.source_conn_id = source_conn_id + self.dst_conn_id = dest_conn_id + self.overwrite = overwrite + + def execute(self, context: Context) -> None: + src: ObjectStoragePath + dst: ObjectStoragePath + + if isinstance(self.src, str): + src = ObjectStoragePath(self.src, self.source_conn_id) + else: + src = self.src + + if isinstance(self.dst, str): + dst = ObjectStoragePath(self.dst, self.dst_conn_id) + else: + dst = self.dst + + if not self.overwrite: + if dst.exists() and dst.is_file(): + raise ValueError(f"Destination {dst} already exists") + + src.copy(dst) diff --git a/airflow/providers/common/io/provider.yaml b/airflow/providers/common/io/provider.yaml new file mode 100644 index 000000000000..de5ae0935ada --- /dev/null +++ b/airflow/providers/common/io/provider.yaml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-common-io +name: Common IO +description: | + `Common IO Provider` + +suspended: false +versions: + - 1.0.0 + +dependencies: + - apache-airflow>=2.8.0 + +integrations: + - integration-name: Common IO + external-doc-url: https://filesystem-spec.readthedocs.io/en/latest/index.html + how-to-guide: + - /docs/apache-airflow-providers-common-io/operators.rst + tags: [software] + +operators: + - integration-name: Common IO + python-modules: + - airflow.providers.common.io.operators.file_transfer diff --git a/airflow/providers/google/cloud/fs/__init__.py b/airflow/providers/google/cloud/fs/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/google/cloud/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/cloud/fs/gcs.py b/airflow/providers/google/cloud/fs/gcs.py new file mode 100644 index 000000000000..74852d01f725 --- /dev/null +++ b/airflow/providers/google/cloud/fs/gcs.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.providers.google.common.hooks.base_google import GoogleBaseHook + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +GCS_TOKEN = "gcs.oauth2.token" +GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at" +GCS_PROJECT_ID = "gcs.project-id" +GCS_ACCESS = "gcs.access" +GCS_CONSISTENCY = "gcs.consistency" +GCS_CACHE_TIMEOUT = "gcs.cache-timeout" +GCS_REQUESTER_PAYS = "gcs.requester-pays" +GCS_SESSION_KWARGS = "gcs.session-kwargs" +GCS_ENDPOINT = "gcs.endpoint" +GCS_DEFAULT_LOCATION = "gcs.default-bucket-location" +GCS_VERSION_AWARE = "gcs.version-aware" + + +schemes = ["gs", "gcs"] + + +def get_fs(conn_id: str | None) -> AbstractFileSystem: + # https://gcsfs.readthedocs.io/en/latest/api.html#gcsfs.core.GCSFileSystem + from gcsfs import GCSFileSystem + + if conn_id is None: + return GCSFileSystem() + + g = GoogleBaseHook(gcp_conn_id=conn_id) + creds = g.get_credentials() + + return GCSFileSystem( + project=g.project_id, + access=g.extras.get(GCS_ACCESS, "full_control"), + token=creds.token, + consistency=g.extras.get(GCS_CONSISTENCY, "none"), + cache_timeout=g.extras.get(GCS_CACHE_TIMEOUT), + requester_pays=g.extras.get(GCS_REQUESTER_PAYS, False), + session_kwargs=g.extras.get(GCS_SESSION_KWARGS, {}), + endpoint_url=g.extras.get(GCS_ENDPOINT), + default_location=g.extras.get(GCS_DEFAULT_LOCATION), + version_aware=g.extras.get(GCS_VERSION_AWARE, "false").lower() == "true", + ) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index cd80123cca56..98431889b073 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -83,6 +83,7 @@ dependencies: - gcloud-aio-auth>=4.0.0,<5.0.0 - gcloud-aio-bigquery>=6.1.2 - gcloud-aio-storage + - gcsfs>=2023.9.2 - google-ads>=21.2.0 - google-api-core>=2.11.0 - google-api-python-client>=1.6.0 @@ -702,6 +703,9 @@ sensors: python-modules: - airflow.providers.google.cloud.sensors.tasks +filesystems: + - airflow.providers.google.cloud.fs.gcs + hooks: - integration-name: Google Ads python-modules: diff --git a/airflow/providers/installed_providers.txt b/airflow/providers/installed_providers.txt index bd32056d5f32..0d9b03a55e2e 100644 --- a/airflow/providers/installed_providers.txt +++ b/airflow/providers/installed_providers.txt @@ -1,6 +1,7 @@ amazon celery cncf.kubernetes +common.io common.sql daskexecutor docker diff --git a/airflow/providers/microsoft/azure/fs/__init__.py b/airflow/providers/microsoft/azure/fs/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/airflow/providers/microsoft/azure/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/microsoft/azure/fs/adls.py b/airflow/providers/microsoft/azure/fs/adls.py new file mode 100644 index 000000000000..e6eaf258498d --- /dev/null +++ b/airflow/providers/microsoft/azure/fs/adls.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.hooks.base import BaseHook +from airflow.providers.microsoft.azure.utils import get_field + +if TYPE_CHECKING: + from fsspec import AbstractFileSystem + +schemes = ["abfs", "abfss", "adl"] + + +def get_fs(conn_id: str | None) -> AbstractFileSystem: + from adlfs import AzureBlobFileSystem + + if conn_id is None: + return AzureBlobFileSystem() + + conn = BaseHook.get_connection(conn_id) + extras = conn.extra_dejson + + connection_string = get_field( + conn_id=conn_id, conn_type="azure_data_lake", extras=extras, field_name="connection_string" + ) + account_name = get_field( + conn_id=conn_id, conn_type="azure_data_lake", extras=extras, field_name="account_name" + ) + account_key = get_field( + conn_id=conn_id, conn_type="azure_data_lake", extras=extras, field_name="account_key" + ) + sas_token = get_field(conn_id=conn_id, conn_type="azure_data_lake", extras=extras, field_name="sas_token") + tenant = get_field(conn_id=conn_id, conn_type="azure_data_lake", extras=extras, field_name="tenant") + + return AzureBlobFileSystem( + connection_string=connection_string, + account_name=account_name, + account_key=account_key, + sas_token=sas_token, + tenant_id=tenant, + client_id=conn.login, + client_secret=conn.password, + ) diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 91d65f5fdc23..0f783a9f3efa 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -66,6 +66,7 @@ versions: dependencies: - apache-airflow>=2.5.0 + - adlfs>=2023.9.2 - azure-batch>=8.0.0 - azure-cosmos>=4.0.0 - azure-mgmt-cosmosdb @@ -189,6 +190,9 @@ sensors: python-modules: - airflow.providers.microsoft.azure.sensors.data_factory +filesystems: + - airflow.providers.microsoft.azure.fs.adls + hooks: - integration-name: Microsoft Azure Container Instances python-modules: diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index a5502bce416e..09d1c07f65ce 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -417,7 +417,7 @@ def __init__(self): self._provider_dict: dict[str, ProviderInfo] = {} # Keeps dict of hooks keyed by connection type self._hooks_dict: dict[str, HookInfo] = {} - + self._fs_set: set[str] = set() self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # keeps mapping between connection_types and hook class, package they come from self._hook_provider_dict: dict[str, HookClassProvider] = {} @@ -508,6 +508,12 @@ def initialize_providers_hooks(self): self._discover_hooks() self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items())) + @provider_info_cache("filesystems") + def initialize_providers_filesystems(self): + """Lazy initialization of providers filesystems.""" + self.initialize_providers_list() + self._discover_filesystems() + @provider_info_cache("taskflow_decorators") def initialize_providers_taskflow_decorator(self): """Lazy initialization of providers hooks.""" @@ -843,6 +849,14 @@ def _import_info_from_all_hooks(self): # that the main reason why original sorting moved to cli part: # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items())) + def _discover_filesystems(self) -> None: + """Retrieve all filesystems defined in the providers.""" + for provider_package, provider in self._provider_dict.items(): + for fs_module_name in provider.data.get("filesystems", []): + if _correctness_check(provider_package, fs_module_name + ".get_fs", provider): + self._fs_set.add(fs_module_name) + self._fs_set = set(sorted(self._fs_set)) + def _discover_taskflow_decorators(self) -> None: for name, info in self._provider_dict.items(): for taskflow_decorator in info.data.get("task-decorators", []): @@ -1211,6 +1225,11 @@ def executor_class_names(self) -> list[str]: self.initialize_providers_executors() return sorted(self._executor_class_name_set) + @property + def filesystem_module_names(self) -> list[str]: + self.initialize_providers_filesystems() + return sorted(self._fs_set) + @property def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: self.initialize_providers_configuration() diff --git a/docs/apache-airflow-providers-common-io/changelog.rst b/docs/apache-airflow-providers-common-io/changelog.rst new file mode 100644 index 000000000000..16c2e98ac653 --- /dev/null +++ b/docs/apache-airflow-providers-common-io/changelog.rst @@ -0,0 +1,19 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../../airflow/providers/common/io/CHANGELOG.rst diff --git a/docs/apache-airflow-providers-common-io/commits.rst b/docs/apache-airflow-providers-common-io/commits.rst new file mode 100644 index 000000000000..a5ae8822a20f --- /dev/null +++ b/docs/apache-airflow-providers-common-io/commits.rst @@ -0,0 +1,36 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Package apache-airflow-providers-common-io +------------------------------------------ + +Common IO Provider + +This is detailed commit list of changes for versions provider package: ``common.io``. +For high-level changelog, see :doc:`package information including changelog `. + +1.0.0 +..... + +Latest change: 2023-10-19 + +================================================================================================= =========== ======================================================== +Commit Committed Subject +================================================================================================= =========== ======================================================== +================================================================================================= =========== ======================================================== diff --git a/docs/apache-airflow-providers-common-io/index.rst b/docs/apache-airflow-providers-common-io/index.rst new file mode 100644 index 000000000000..5c6c7853f88c --- /dev/null +++ b/docs/apache-airflow-providers-common-io/index.rst @@ -0,0 +1,70 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-common-io`` +======================================= + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Transferring a file + Operators + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/common/io/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/common/io/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + Example DAGs + PyPI Repository + Installing from sources + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits diff --git a/docs/apache-airflow-providers-common-io/installing-providers-from-sources.rst b/docs/apache-airflow-providers-common-io/installing-providers-from-sources.rst new file mode 100644 index 000000000000..b4e730f4ff21 --- /dev/null +++ b/docs/apache-airflow-providers-common-io/installing-providers-from-sources.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/installing-providers-from-sources.rst diff --git a/docs/apache-airflow-providers-common-io/operators.rst b/docs/apache-airflow-providers-common-io/operators.rst new file mode 100644 index 000000000000..7170c1898023 --- /dev/null +++ b/docs/apache-airflow-providers-common-io/operators.rst @@ -0,0 +1,47 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +IO Operators +============= + +These operators perform various operations on a filesystem or object storage. + +.. _howto/operator:FileTransferOperator: + +Transfer a file +~~~~~~~~~~~~~~~ + +Use the :class:`~airflow.providers.common.io.operators.file_transfer.FileTransferOperator` to copy a file from one +location to another. Parameters of the operator are: + +- ``src`` - source path as a str or ObjectStoragePath +- ``dst`` - destination path as a str or ObjectStoragePath +- ``src_conn_id`` - source connection id (default: ``None``) +- ``dst_conn_id`` - destination connection id (default: ``None``) +- ``overwrite`` - overwrite destination (default: ``False``) + +If the ``src`` and the ``dst`` are both on the same object storage, copy will be performed in the object storage. +Otherwise the data will be streamed from the source to the destination. + +The example below shows how to instantiate the SQLExecuteQueryOperator task. + +.. exampleinclude:: /../../tests/system/providers/common/io/example_file_transfer_local_to_s3.py + :language: python + :dedent: 4 + :start-after: [START howto_transfer_local_to_s3] + :end-before: [END howto_transfer_local_to_s3] diff --git a/docs/apache-airflow-providers-common-io/transfer.rst b/docs/apache-airflow-providers-common-io/transfer.rst new file mode 100644 index 000000000000..6ab455605e2c --- /dev/null +++ b/docs/apache-airflow-providers-common-io/transfer.rst @@ -0,0 +1,32 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/transfer:io: + +Transferring a File +=================== + +The IO Provider package operators allow you to transfer files between various +locations, like local filesystem, S3, etc. + +Default Connection ID +~~~~~~~~~~~~~~~~~~~~~ + +IO Operators under this provider make use of the default connection ids associated +with the connection scheme or protocol. This is typically obtained from environment +variables. diff --git a/docs/apache-airflow/core-concepts/index.rst b/docs/apache-airflow/core-concepts/index.rst index c8419b99628d..e8c677fc3a8b 100644 --- a/docs/apache-airflow/core-concepts/index.rst +++ b/docs/apache-airflow/core-concepts/index.rst @@ -40,6 +40,7 @@ Here you can find detailed documentation about each one of the core concepts of sensors taskflow executor/index + objectstorage **Communication** diff --git a/docs/apache-airflow/core-concepts/objectstorage.rst b/docs/apache-airflow/core-concepts/objectstorage.rst new file mode 100644 index 000000000000..493df2675a44 --- /dev/null +++ b/docs/apache-airflow/core-concepts/objectstorage.rst @@ -0,0 +1,227 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _concepts:objectstorage: + +Object Storage +============== + +.. versionadded:: 2.8.0 + +Airflow provides a generic abstraction on top of object stores, like s3, gcs, and azure blob storage. +This abstraction allows you to use a variety of object storage systems in your DAGs without having to +change you code to deal with every different object storage system. In addition, it allows you to use +most of the standard Python modules, like ``shutil``, that can work with file-like objects. + +Support for a particular object storage system depends on the providers you have installed. For +example, if you have installed the ``apache-airflow-providers-google`` provider, you will be able to +use the ``gcs`` scheme for object storage. Out of the box, Airflow provides support for the ``file`` +scheme. + +.. note:: + Support for s3 requires you to install ``apache-airflow-providers-amazon[s3fs]``. This is because + it depends on ``aiobotocore``, which is not installed by default as it can create dependency + challenges with ``botocore``. + + +.. _concepts:basic-use: + +Basic Use +--------- + +To use object storage, you need to instantiate a Path-like (see below) object with the URI of the +object you want to interact with. For example, to point to a bucket in s3, you would do the following: + +.. code-block:: python + + from airflow.io.store.path import ObjectStoragePath + + base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default") # conn_id is optional + + +Listing file-objects: + +.. code-block:: python + + @task + def list_files() -> list(ObjectStoragePath): + files = [] + for f in base.iterdir(): + if f.is_file(): + files.append(f) + + return files + + +Navigating inside a directory tree: + +.. code-block:: python + + base = ObjectStoragePath("s3://my-bucket/") + subdir = base / "subdir" + + # prints ObjectStoragePath("s3://my-bucket/subdir") + print(subdir) + + +Opening a file: + +.. code-block:: python + + @task + def read_file(path: ObjectStoragePath) -> str: + with path.open() as f: + return f.read() + + +Leveraging XCOM, you can pass paths between tasks: + +.. code-block:: python + + @task + def create(path: ObjectStoragePath) -> ObjectStoragePath: + return path / "new_file.txt" + + + @task + def write_file(path: ObjectStoragePath, content: str): + with path.open("wb") as f: + f.write(content) + + + new_file = create(base) + write = write_file(new_file, b"data") + + read >> write + + +.. _concepts:api: + +Path-like API +------------- + +The object storage abstraction is implemented as a `Path-like API `_. +This means that you can mostly use the same API to interact with object storage as you would with a local filesystem. +In this section we only list the differences between the two APIs. Extended operations beyond the standard Path API +, like copying and moving, are listed in the next section. For details about each operation, like what arguments +they take, see the documentation of the :class:`~airflow.io.store.path.ObjectStoragePath` class. + + +stat +^^^^ + +Returns a ``stat_result`` like object that supports the following attributes: ``st_size``, ``st_mtime``, ``st_mode``, +but also acts like a dictionary that can provide additional metadata about the object. For example, for s3 it will, +return the additional keys like: ``['ETag', 'ContentType']``. If your code needs to be portable across different object +store do not rely on the extended metadata. + +.. note:: + While ``stat`` does accept the ``follow_symlinks`` argument, it is not passed on to the object storage backend as + not all object storage does not support symlinks. + + +mkdir +^^^^^ + +Create a directory entry at the specified path or within a bucket/container. For systems that don't have true +directories, it may create a directory entry for this instance only and not affect the real filesystem. + +If ``create_parents`` is ``True`` (the default), any missing parents of this path are created as needed. + + +touch +^^^^^ + +Create an empty file, or update the timestamp. If ``truncate`` is ``True``, the file is truncated, which is the +default. + + +.. _concepts:extended-operations: + +Extended Operations +------------------- + +The following operations are not part of the standard Path API, but are supported by the object storage abstraction. + +ukey +^^^^ + +Hash of file properties, to tell if it has changed. + + +checksum +^^^^^^^^ + +Return the checksum of the file. + + +read_block +^^^^^^^^^^ + +Read a block of bytes from the file. This is useful for reading large files in chunks. + + +du +^^ + +Space used by files and optionally directories within a path. + + +find +^^^^ + +Find files and optionally directories within a path. + + +ls +^^ + +List files within a path. + + +sign +^^^^ + +Create a signed URL representing the given path. Some implementations allow temporary URLs to be generated, as a +way of delegating credentials. + + +copy +^^^^ + +Copy a file from one path to another. If the destination is a directory, the file will be copied into it. If the +destination is a file, it will be overwritten. + +move +^^^^ + +Move a file from one path to another. If the destination is a directory, the file will be moved into it. If the +destination is a file, it will be overwritten. + + +.. _concepts:copying-and-moving: + +Copying and Moving +------------------ + +This documents the expected behavior of the ``copy`` and ``move`` operations, particularly for cross object store (e.g. +file -> s3) behavior. Each method copies or moves files or directories from a ``source`` to a ``target`` location. +The intended behavior is the same as specified by +`fsspec `_. For cross object store directory copying, +Airflow needs to walk the directory tree and copy each file individually. This is done by streaming each file from the +source to the target. diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 1fd5e157f6a6..b9637b1c149a 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -68,6 +68,8 @@ python dependencies for the provided package. +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | sentry | ``pip install 'apache-airflow[sentry]'`` | Sentry service for application logging and monitoring | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ +| s3fs | ``pip install 'apache-airflow[s3fs]'`` | Support for S3 as Airflow FS | ++---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | statsd | ``pip install 'apache-airflow[statsd]'`` | Needed by StatsD metrics | +---------------------+-----------------------------------------------------+----------------------------------------------------------------------------+ | virtualenv | ``pip install 'apache-airflow[virtualenv]'`` | Running python tasks in local virtualenv | @@ -283,6 +285,8 @@ These are extras that provide support for integration with external systems via +---------------------+-----------------------------------------------------+--------------------------------------+--------------+ | extra | install command | enables | Preinstalled | +=====================+=====================================================+======================================+==============+ +| common.io | ``pip install 'apache-airflow[common.io]'`` | Core IO Operators | * | ++---------------------+-----------------------------------------------------+--------------------------------------+--------------+ | common.sql | ``pip install 'apache-airflow[common.sql]'`` | Core SQL Operators | * | +---------------------+-----------------------------------------------------+--------------------------------------+--------------+ | ftp | ``pip install 'apache-airflow[ftp]'`` | FTP hooks and operators | * | diff --git a/docs/apache-airflow/tutorial/index.rst b/docs/apache-airflow/tutorial/index.rst index 5a6e0b2de073..2c0a4492c071 100644 --- a/docs/apache-airflow/tutorial/index.rst +++ b/docs/apache-airflow/tutorial/index.rst @@ -26,3 +26,4 @@ Once you have Airflow up and running with the :doc:`/start`, these tutorials are fundamentals taskflow pipeline + objectstorage diff --git a/docs/apache-airflow/tutorial/objectstorage.rst b/docs/apache-airflow/tutorial/objectstorage.rst new file mode 100644 index 000000000000..89ffe0e8f95d --- /dev/null +++ b/docs/apache-airflow/tutorial/objectstorage.rst @@ -0,0 +1,118 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + + +Object Storage +============== + +This tutorial shows how to use the Object Storage API to manage objects that +reside on object storage, like S3, gcs and azure blob storage. The API is introduced +as part of Airflow 2.8. + +The tutorial covers a simple pattern that is often used in data engineering and +data science workflows: accessing a web api, saving and analyzing the result. For the +tutorial to work you will need to have Duck DB installed, which is a in-process +analytical database. You can do this by running ``pip install duckdb``. The tutorial +makes use of S3 Object Storage. This requires that the amazon provider is installed +including ``s3fs`` by running ``pip install apache-airflow-providers-amazon[s3fs]``. +If you would like to use a different storage provider, you can do so by changing the +url in the ``create_object_storage_path`` function to the appropriate url for your +provider, for example by replacing ``s3://`` with ``gs://`` for Google Cloud Storage. +You will also need the right provider to be installed then. Finally, you will need +``pandas``, which can be installed by running ``pip install pandas``. + + +Creating an ObjectStoragePath +----------------------------- + +The ObjectStoragePath is a path-like object that represents a path on object storage. +It is the fundamental building block of the Object Storage API. + +.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py + :language: python + :start-after: [START create_object_storage_path] + :end-before: [END create_object_storage_path] + +The ObjectStoragePath constructor can take an optional connection id. If supplied +it will use the connection to obtain the right credentials to access the backend. +Otherwise it will revert to the default for that backend. + +It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections +will not be created until the path is used. This means that you can create the +path in the global scope of your DAG and use it in multiple tasks. + +Saving data to Object Storage +----------------------------- + +An ObjectStoragePath behaves mostly like a pathlib.Path object. You can +use it to save and load data directly to and from object storage. So, a typical +flow could look like this: + +.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py + :language: python + :start-after: [START get_air_quality_data] + :end-before: [END get_air_quality_data] + +The ``get_air_quality_data`` calls the API of the Finnish Meteorological Institute +to obtain the air quality data for the region of Helsinki. It creates a +Pandas DataFrame from the resulting json. It then saves the data to object storage +and converts it on the fly to parquet. + +The key of the object is automatically generated from the logical date of the task, +so we could run this everyday and it would create a new object for each day. We +concatenate this key with the base path to create the full path to the object. Finally, +after writing the object to storage, we return the path to the object. This allows +us to use the path in the next task. + +Analyzing the data +------------------ + +In understanding the data, you typically want to analyze it. Duck DB is a great +tool for this. It is an in-process analytical database that allows you to run +SQL queries on data in memory. + +Because the data is already in parquet format, we can use the ``read_parquet`` and +because both Duck DB and the ObjectStoragePath use ``fsspec`` we can register the +backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the ``fs`` +property for this. We can then use the ``register_filesystem`` function from Duck DB +to register the backend with Duck DB. + +In Duck DB we can then create a table from the data and run a query on it. The +query is returned as a dataframe, which could be used for further analysis or +saved to object storage. + +.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py + :language: python + :start-after: [START analyze] + :end-before: [END analyze] + +You might note that the ``analyze`` function does not know the original +path to the object, but that it is passed in as a parameter and obtained +through XCom. You do not need to re-instantiate the Path object. Also +the connection details are handled transparently. + +Putting it all together +----------------------- + +The final DAG looks like this, which wraps things so that we can run it: + +.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py + :language: python + :start-after: [START tutorial] + :end-before: [END tutorial] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 741ed6b67d9c..fdb8f053aff8 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -510,6 +510,7 @@ Dsn dsn dttm dtypes +du durations dylib Dynamodb @@ -1617,6 +1618,7 @@ tzinfo UA ui uid +ukey ulimit Umask umask diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index de71df10f7c2..772fa4f78a19 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -269,6 +269,13 @@ "cross-providers-deps": [], "excluded-python-versions": [] }, + "common.io": { + "deps": [ + "apache-airflow>=2.8.0" + ], + "cross-providers-deps": [], + "excluded-python-versions": [] + }, "common.sql": { "deps": [ "apache-airflow>=2.5.0", @@ -409,6 +416,7 @@ "gcloud-aio-auth>=4.0.0,<5.0.0", "gcloud-aio-bigquery>=6.1.2", "gcloud-aio-storage", + "gcsfs>=2023.9.2", "google-ads>=21.2.0", "google-api-core>=2.11.0", "google-api-python-client>=1.6.0", @@ -550,6 +558,7 @@ "microsoft.azure": { "deps": [ "adal>=1.2.7", + "adlfs>=2023.9.2", "apache-airflow>=2.5.0", "azure-batch>=8.0.0", "azure-cosmos>=4.0.0", diff --git a/images/breeze/output_build-docs.svg b/images/breeze/output_build-docs.svg index 3bfd58210de4..6bb4ceffe65a 100644 --- a/images/breeze/output_build-docs.svg +++ b/images/breeze/output_build-docs.svg @@ -163,10 +163,10 @@ [OPTIONS] [all-providers | providers-index | apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam | apache.cassandra | apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive |              apache.impala | apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop |  -apprise | arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.sql | daskexecutor |        -databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github |    -google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql |              -microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie |     +apprise | arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.io | common.sql |           +daskexecutor | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    +ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql +microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie |   oracle | pagerduty | papermill | plexus | postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex |      zendesk]...                                                                                                            diff --git a/images/breeze/output_release-management_add-back-references.svg b/images/breeze/output_release-management_add-back-references.svg index b5fc79d1c637..ebcd1572ef1b 100644 --- a/images/breeze/output_release-management_add-back-references.svg +++ b/images/breeze/output_release-management_add-back-references.svg @@ -1,4 +1,4 @@ - + [OPTIONS] [all-providers | apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam |     apache.cassandra | apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive | apache.impala |            apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop | apprise |        -arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.sql | daskexecutor | databricks |     -datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | -hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp |             -microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | -papermill | plexus | postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack -smtp | snowflake | sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...              - -Command to add back references for documentation to make it backward compatible. - -╭─ Add Back References to Docs â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® -│*--airflow-site-directory-aLocal directory path of cloned airflow-site repo.(DIRECTORY)[required]│ -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -╭─ Common options â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® -│--verbose-vPrint verbose information about performed steps.│ -│--dry-run-DIf dry-run is set, commands are only printed, not executed.│ -│--help-hShow this message and exit.│ -╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.io | common.sql | daskexecutor |      +databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github |    +google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql |              +microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie |     +oracle | pagerduty | papermill | plexus | postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | +singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex |      +zendesk]...                                                                                                            + +Command to add back references for documentation to make it backward compatible. + +╭─ Add Back References to Docs â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® +│*--airflow-site-directory-aLocal directory path of cloned airflow-site repo.(DIRECTORY)[required]│ +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ +╭─ Common options â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â•® +│--verbose-vPrint verbose information about performed steps.│ +│--dry-run-DIf dry-run is set, commands are only printed, not executed.│ +│--help-hShow this message and exit.│ +╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/images/breeze/output_release-management_generate-issue-content-providers.svg b/images/breeze/output_release-management_generate-issue-content-providers.svg index fe19f7b813e9..10139cafc51e 100644 --- a/images/breeze/output_release-management_generate-issue-content-providers.svg +++ b/images/breeze/output_release-management_generate-issue-content-providers.svg @@ -144,12 +144,12 @@ [OPTIONS] [apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam | apache.cassandra |  apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive | apache.impala | apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop | apprise | arangodb | asana | atlassian.jira |  -celery | cloudant | cncf.kubernetes | common.sql | daskexecutor | databricks | datadog | dbt.cloud | dingding |        -discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http | imap |        -influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql |     -neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus | postgres |   -presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite |    -ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                            +celery | cloudant | cncf.kubernetes | common.io | common.sql | daskexecutor | databricks | datadog | dbt.cloud |       +dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http |    +imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo |      +mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus |      +postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |  +sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                   Generates content for issue to test the release. diff --git a/images/breeze/output_release-management_prepare-provider-documentation.svg b/images/breeze/output_release-management_prepare-provider-documentation.svg index c639e6645d56..a8e4587c4b15 100644 --- a/images/breeze/output_release-management_prepare-provider-documentation.svg +++ b/images/breeze/output_release-management_prepare-provider-documentation.svg @@ -156,12 +156,12 @@ [OPTIONS] [apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam | apache.cassandra |  apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive | apache.impala | apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop | apprise | arangodb | asana | atlassian.jira |  -celery | cloudant | cncf.kubernetes | common.sql | daskexecutor | databricks | datadog | dbt.cloud | dingding |        -discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http | imap |        -influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql |     -neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus | postgres |   -presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite |    -ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                            +celery | cloudant | cncf.kubernetes | common.io | common.sql | daskexecutor | databricks | datadog | dbt.cloud |       +dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http |    +imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo |      +mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus |      +postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |  +sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                   Prepare CHANGELOG, README and COMMITS information for providers. diff --git a/images/breeze/output_release-management_prepare-provider-packages.svg b/images/breeze/output_release-management_prepare-provider-packages.svg index 3d99765b345a..fa4c36063edd 100644 --- a/images/breeze/output_release-management_prepare-provider-packages.svg +++ b/images/breeze/output_release-management_prepare-provider-packages.svg @@ -141,12 +141,12 @@ [OPTIONS] [apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam | apache.cassandra |  apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive | apache.impala | apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop | apprise | arangodb | asana | atlassian.jira |  -celery | cloudant | cncf.kubernetes | common.sql | daskexecutor | databricks | datadog | dbt.cloud | dingding |        -discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http | imap |        -influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql |     -neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus | postgres |   -presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite |    -ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                            +celery | cloudant | cncf.kubernetes | common.io | common.sql | daskexecutor | databricks | datadog | dbt.cloud |       +dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github | google | grpc | hashicorp | http |    +imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo |      +mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | plexus |      +postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |  +sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex | zendesk]...                                   Prepare sdist/whl packages of Airflow Providers. diff --git a/images/breeze/output_release-management_publish-docs.svg b/images/breeze/output_release-management_publish-docs.svg index ad72facea3a0..8108ee78d10a 100644 --- a/images/breeze/output_release-management_publish-docs.svg +++ b/images/breeze/output_release-management_publish-docs.svg @@ -180,10 +180,10 @@ [OPTIONS] [all-providers | providers-index | apache-airflow | docker-stack | helm-chart | airbyte | alibaba | amazon | apache.beam | apache.cassandra | apache.drill | apache.druid | apache.flink | apache.hdfs | apache.hive |              apache.impala | apache.kafka | apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apache.sqoop |  -apprise | arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.sql | daskexecutor |        -databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook | ftp | github |    -google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql |              -microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie |     +apprise | arangodb | asana | atlassian.jira | celery | cloudant | cncf.kubernetes | common.io | common.sql |           +daskexecutor | databricks | datadog | dbt.cloud | dingding | discord | docker | elasticsearch | exasol | facebook |    +ftp | github | google | grpc | hashicorp | http | imap | influxdb | jdbc | jenkins | microsoft.azure | microsoft.mssql +microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openfaas | openlineage | opensearch | opsgenie |   oracle | pagerduty | papermill | plexus | postgres | presto | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau | tabular | telegram | trino | vertica | yandex |      zendesk]...                                                                                                            diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index 3fa890fc2433..6967afb4a859 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -345,7 +345,7 @@ if [[ ${UPGRADE_BOTO=} == "true" ]]; then echo echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}" echo - pip uninstall --root-user-action ignore aiobotocore -y || true + pip uninstall --root-user-action ignore aiobotocore s3fs -y || true pip install --root-user-action ignore --upgrade boto3 botocore pip check fi diff --git a/setup.cfg b/setup.cfg index 76e735b112c9..c63db9049013 100644 --- a/setup.cfg +++ b/setup.cfg @@ -100,6 +100,7 @@ install_requires = flask-login>=0.6.2 flask-session>=0.4.0 flask-wtf>=0.15 + fsspec>=2023.9.2 google-re2>=1.0 graphviz>=0.12 gunicorn>=20.1.0 diff --git a/setup.py b/setup.py index b96364c6c257..2a0846164aa8 100644 --- a/setup.py +++ b/setup.py @@ -497,6 +497,12 @@ def write_version(filename: str = str(AIRFLOW_SOURCES_ROOT / "airflow" / "git_ve "aiobotocore>=2.1.1", ] +s3fs = [ + # This is required for support of S3 file system which uses aiobotocore + # which can have a conflict with boto3 as mentioned above + "s3fs>=2023.9.2", +] + def get_provider_dependencies(provider_name: str) -> list[str]: if provider_name not in PROVIDER_DEPENDENCIES: @@ -523,6 +529,7 @@ def get_unique_dependency_list(req_list_iterable: Iterable[list[str]]): get_provider_dependencies("mysql"), pandas, password, + s3fs, ] ) @@ -568,6 +575,7 @@ def get_unique_dependency_list(req_list_iterable: Iterable[list[str]]): "pandas": pandas, "password": password, "rabbitmq": rabbitmq, + "s3fs": s3fs, "sentry": sentry, "statsd": statsd, "virtualenv": virtualenv, @@ -799,6 +807,7 @@ def sort_extras_dependencies() -> dict[str, list[str]]: # Those providers do not have dependency on airflow2.0 because that would lead to circular dependencies. # This is not a problem for PIP but some tools (pipdeptree) show those as a warning. PREINSTALLED_PROVIDERS = [ + "common.io", "common.sql", "ftp", "http", diff --git a/tests/io/__init__.py b/tests/io/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/io/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/io/store/__init__.py b/tests/io/store/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/io/store/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/io/store/test_store.py b/tests/io/store/test_store.py new file mode 100644 index 000000000000..db1609f283a8 --- /dev/null +++ b/tests/io/store/test_store.py @@ -0,0 +1,236 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import uuid +from unittest import mock + +import pytest +from fsspec.implementations.local import LocalFileSystem +from fsspec.utils import stringify_path + +from airflow.io.store import _STORE_CACHE, ObjectStore, attach +from airflow.io.store.path import ObjectStoragePath +from airflow.utils.module_loading import qualname + +FAKE = "file:///fake" +MNT = "file:///mnt/warehouse" +FOO = "file:///mnt/warehouse/foo" +BAR = FOO + + +class FakeRemoteFileSystem(LocalFileSystem): + id = "fakefs" + auto_mk_dir = True + + @property + def fsid(self): + return self.id + + @classmethod + def _strip_protocol(cls, path) -> str: + path = stringify_path(path) + i = path.find("://") + return path[i + 3 :] if i > 0 else path + + +class TestFs: + def test_alias(self): + store = attach("file", alias="local") + assert isinstance(store.fs, LocalFileSystem) + assert "local" in _STORE_CACHE + + def test_init_objectstoragepath(self): + path = ObjectStoragePath("file://bucket/key/part1/part2") + assert path.bucket == "bucket" + assert path.key == "key/part1/part2" + assert path._protocol == "file" + + path2 = ObjectStoragePath(path / "part3") + assert path2.bucket == "bucket" + assert path2.key == "key/part1/part2/part3" + assert path2._protocol == "file" + + def test_read_write(self): + o = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}") + + with o.open("wb") as f: + f.write(b"foo") + + assert o.open("rb").read() == b"foo" + + o.unlink() + + def test_ls(self): + dirname = str(uuid.uuid4()) + filename = str(uuid.uuid4()) + + d = ObjectStoragePath(f"file:///tmp/{dirname}") + d.mkdir(create_parents=True) + o = d / filename + o.touch() + + data = d.ls() + assert len(data) == 1 + assert data[0]["name"] == o + + data = d.ls(detail=False) + assert data == [o] + + d.unlink(recursive=True) + + assert not o.exists() + + def test_find(self): + dirname = str(uuid.uuid4()) + filename = str(uuid.uuid4()) + + d = ObjectStoragePath(f"file:///tmp/{dirname}") + d.mkdir(create_parents=True) + o = d / filename + o.touch() + + data = d.find("") + assert len(data) == 1 + assert data == [o] + + data = d.ls(detail=True) + assert len(data) == 1 + assert data[0]["name"] == o + + d.unlink(recursive=True) + + @pytest.mark.parametrize( + "fn, args, fn2, path, expected_args, expected_kwargs", + [ + ("du", {}, "du", FOO, BAR, {"total": True, "maxdepth": None, "withdirs": False}), + ("exists", {}, "exists", FOO, ObjectStoragePath(BAR), {}), + ("checksum", {}, "checksum", FOO, ObjectStoragePath(BAR), {}), + ("size", {}, "size", FOO, ObjectStoragePath(BAR), {}), + ("is_dir", {}, "isdir", FOO, ObjectStoragePath(BAR), {}), + ("is_file", {}, "isfile", FOO, ObjectStoragePath(BAR), {}), + # ("is_symlink", {}, "islink", FOO, ObjectStoragePath(BAR), {}), + ("touch", {}, "touch", FOO, BAR, {"truncate": True}), + ("mkdir", {"exists_ok": True}, "mkdir", FOO, BAR, {"create_parents": True}), + ("read_text", {}, "read_text", FOO, BAR, {"encoding": None, "errors": None, "newline": None}), + ("read_bytes", {}, "read_bytes", FOO, BAR, {"start": None, "end": None}), + ("rm", {}, "rm", FOO, BAR, {"maxdepth": None, "recursive": False}), + ("rmdir", {}, "rmdir", FOO, BAR, {}), + ("write_bytes", {"data": b"foo"}, "pipe_file", FOO, ObjectStoragePath(BAR), {"value": b"foo"}), + ( + "write_text", + {"data": "foo"}, + "write_text", + FOO, + BAR, + {"value": "foo", "encoding": None, "errors": None, "newline": None}, + ), + ("ukey", {}, "ukey", FOO, BAR, {}), + ], + ) + def test_standard_api(self, fn, args, fn2, path, expected_args, expected_kwargs): + _fs = mock.Mock() + _fs._strip_protocol.return_value = "/" + _fs.conn_id = "fake" + + store = attach(protocol="mock", fs=_fs) + o = ObjectStoragePath(path, store=store) + + getattr(o, fn)(**args) + getattr(store.fs, fn2).assert_called_once_with(expected_args, **expected_kwargs) + + def test_move_local(self): + _from = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}") + _to = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}") + + _from.touch() + _from.move(_to) + assert _to.exists() + assert not _from.exists() + + _to.unlink() + + def test_move_remote(self): + attach("fakefs", fs=FakeRemoteFileSystem()) + + _from = ObjectStoragePath(f"file:///tmp/{str(uuid.uuid4())}") + print(_from) + _to = ObjectStoragePath(f"fakefs:///tmp/{str(uuid.uuid4())}") + print(_to) + + _from.touch() + _from.move(_to) + assert not _from.exists() + assert _to.exists() + + _to.unlink() + + def test_copy_remote_remote(self): + # foo = xxx added to prevent same fs token + attach("ffs", fs=FakeRemoteFileSystem(auto_mkdir=True, foo="bar")) + attach("ffs2", fs=FakeRemoteFileSystem(auto_mkdir=True, foo="baz")) + + dir_src = f"/tmp/{str(uuid.uuid4())}" + dir_dst = f"/tmp/{str(uuid.uuid4())}" + key = "foo/bar/baz.txt" + + # note we are dealing with object storage characteristics + # while working on a local filesystem, so it might feel not intuitive + _from = ObjectStoragePath(f"ffs://{dir_src}") + _from_file = _from / key + _from_file.touch() + assert _from_file.exists() + + _to = ObjectStoragePath(f"ffs2://{dir_dst}") + _from.copy(_to) + + assert _to.exists() + assert _to.is_dir() + assert (_to / _from.key / key).exists() + assert (_to / _from.key / key).is_file() + + _from.unlink(recursive=True) + _to.unlink(recursive=True) + + def test_serde_objectstoragepath(self): + path = "file://bucket/key/part1/part2" + o = ObjectStoragePath(path) + s = o.serialize() + d = ObjectStoragePath.deserialize(s, 1) + + assert s["path"] == path + assert o == d + + def test_serde_store(self): + store = attach("file", conn_id="mock") + s = store.serialize() + d = ObjectStore.deserialize(s, 1) + + assert s["protocol"] == "file" + assert s["conn_id"] == "mock" + assert s["filesystem"] is None + assert store == d + + store = attach("localfs", fs=LocalFileSystem()) + s = store.serialize() + d = ObjectStore.deserialize(s, 1) + + assert s["protocol"] == "localfs" + assert s["conn_id"] is None + assert s["filesystem"] == qualname(LocalFileSystem) + assert store == d diff --git a/tests/providers/amazon/aws/fs/__init__.py b/tests/providers/amazon/aws/fs/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/providers/amazon/aws/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/amazon/aws/fs/test_fs.py b/tests/providers/amazon/aws/fs/test_fs.py new file mode 100644 index 000000000000..7a392a28329a --- /dev/null +++ b/tests/providers/amazon/aws/fs/test_fs.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest +import responses +from botocore.awsrequest import AWSRequest + +pytest.importorskip("s3fs") + +TEST_CONN = "aws_test_conn" +TEST_SIGNER_URL = "https://nowhere.to.be.found" +TEST_SIGNER_TOKEN = "TOKEN1234" +TEST_SIGNER_RESP_URL = "https://where.are.you" +TEST_HEADER_KEY = "x-airflow" +TEST_HEADER_VALUE = "payload" +TEST_REQ_URI = "s3://bucket/key" + + +class TestFilesystem: + def test_get_s3fs(self): + from airflow.providers.amazon.aws.fs.s3 import get_fs + + fs = get_fs(conn_id=TEST_CONN) + + assert "s3" in fs.protocol + + @responses.activate + def test_signer(self): + from airflow.providers.amazon.aws.fs.s3 import s3v4_rest_signer + + req = AWSRequest( + method="GET", + url=TEST_REQ_URI, + headers={"x": "y"}, + ) + req.context = {"client_region": "antarctica"} + + responses.add( + responses.POST, + f"{TEST_SIGNER_URL}/v1/aws/s3/sign", + json={ + "uri": TEST_SIGNER_RESP_URL, + "headers": { + TEST_HEADER_KEY: [TEST_HEADER_VALUE], + }, + }, + ) + + req = s3v4_rest_signer( + { + "uri": TEST_SIGNER_URL, + "token": TEST_SIGNER_TOKEN, + }, + req, + ) + + assert req.url == TEST_SIGNER_RESP_URL + assert req.headers[TEST_HEADER_KEY] == TEST_HEADER_VALUE diff --git a/tests/system/providers/common/io/__init__.py b/tests/system/providers/common/io/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/tests/system/providers/common/io/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/common/io/example_file_transfer_local_to_s3.py b/tests/system/providers/common/io/example_file_transfer_local_to_s3.py new file mode 100644 index 000000000000..ff5bfa0471ae --- /dev/null +++ b/tests/system/providers/common/io/example_file_transfer_local_to_s3.py @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +import uuid +from datetime import datetime +from typing import cast + +from airflow import DAG +from airflow.decorators import task +from airflow.io.store.path import ObjectStoragePath +from airflow.providers.common.io.operators.file_transfer import FileTransferOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_file_transfer_local_to_s3" + +SAMPLE_TEXT = "This is some sample text." + +TEMP_FILE_PATH = ObjectStoragePath("file:///tmp") + +AWS_BUCKET_NAME = f"bucket-aws-{DAG_ID}-{ENV_ID}".replace("_", "-") +AWS_BUCKET = ObjectStoragePath(f"s3://{AWS_BUCKET_NAME}") + +AWS_FILE_PATH = AWS_BUCKET + + +@task +def create_temp_file() -> ObjectStoragePath: + path = ObjectStoragePath(TEMP_FILE_PATH / str(uuid.uuid4())) + + with path.open("w") as file: + file.write(SAMPLE_TEXT) + + return path + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_temp_file(path: ObjectStoragePath): + path.unlink() + + +@task +def remove_bucket(): + AWS_BUCKET.unlink(recursive=True) + + +with DAG( + dag_id=DAG_ID, + schedule="@once", + start_date=datetime(2021, 1, 1), # Override to match your needs + tags=["example"], + catchup=False, +) as dag: + temp_file = create_temp_file() + temp_file_path = cast(ObjectStoragePath, temp_file) + + # [START howto_transfer_local_to_s3] + transfer = FileTransferOperator(src=temp_file_path, dst=AWS_BUCKET, task_id="transfer") + # [END howto_transfer_local_to_s3] + + temp_file >> transfer >> remove_bucket() >> delete_temp_file(temp_file_path) + + from tests.system.utils.watcher import watcher + + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)