Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-58: Add Airflow ObjectStore (AFS) #34729

Merged
merged 83 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
0c456ee
IO
bolkedebruin Sep 28, 2023
d9c92f0
Further work
bolkedebruin Sep 28, 2023
bda96de
Add Airflow FS
bolkedebruin Oct 3, 2023
c064314
Add fsspec dependencies
bolkedebruin Oct 8, 2023
29bf6b8
Move stuff to provider packages
bolkedebruin Oct 4, 2023
ad22ed9
Add fsspec
bolkedebruin Oct 4, 2023
58753fa
Use provider style plugins
bolkedebruin Oct 4, 2023
0c4700c
Add plugin registration
bolkedebruin Oct 4, 2023
f86c090
Move exception inline
bolkedebruin Oct 4, 2023
c7774bd
Clean ups
bolkedebruin Oct 5, 2023
133fc4e
Make FileIO work with connection ids
bolkedebruin Oct 5, 2023
ea5584a
Add simple mounts
bolkedebruin Oct 5, 2023
10d7582
Add simple combinations
bolkedebruin Oct 5, 2023
6082452
Allow unmount to use str or Mount
bolkedebruin Oct 5, 2023
11a1f2d
Pre commit stuff - what a mess that creates :/
bolkedebruin Oct 6, 2023
3d57e55
PY38 fixes
bolkedebruin Oct 6, 2023
fbb151b
Address pre-commit
bolkedebruin Oct 6, 2023
ba88b12
Support contexts and PathLib concatenation
bolkedebruin Oct 6, 2023
390c80d
Add s3fs to devel
bolkedebruin Oct 17, 2023
e8331cc
Use deterministic endpoints and generate fsid if not available
bolkedebruin Oct 7, 2023
3d1595f
Fix table
bolkedebruin Oct 8, 2023
48f47b4
Use PathLike object
bolkedebruin Oct 17, 2023
b2f5e26
Fix mypy and test
bolkedebruin Oct 17, 2023
772b754
Simplify implementaton
bolkedebruin Oct 18, 2023
8a97977
Use ObjectStoragePath directly
bolkedebruin Oct 18, 2023
0a46254
Fix docstrings
bolkedebruin Oct 18, 2023
b6b92b2
Check if samestore (maybe just switch to ObjectStorePath copy)
bolkedebruin Oct 18, 2023
fdcd1ba
Use class name instead of type
bolkedebruin Oct 18, 2023
b62778f
Use shutil for copying between stores
bolkedebruin Oct 18, 2023
45c8183
Make sure to set alias only when not specified
bolkedebruin Oct 18, 2023
1c1c2a9
Use backing copy
bolkedebruin Oct 18, 2023
328eb00
Fix test
bolkedebruin Oct 18, 2023
95da859
Fix test
bolkedebruin Oct 18, 2023
6a1b525
Implement caching for filesystems
bolkedebruin Oct 19, 2023
41785d0
Move FileTransferOperator to provider package
bolkedebruin Oct 19, 2023
44a03ae
Pin dependencies
bolkedebruin Oct 19, 2023
bd8d091
Pin aiobotocore until new release of fsspec
bolkedebruin Oct 19, 2023
6654d59
Address version name
bolkedebruin Oct 19, 2023
7df70f3
Don't copy paste too much
bolkedebruin Oct 19, 2023
cb6f442
Use aws infrastructure for getting a session
bolkedebruin Oct 19, 2023
1e40842
Make sure endpoint_url is honored
bolkedebruin Oct 19, 2023
42e897a
Remove s3fs from main and keep in provider
bolkedebruin Oct 19, 2023
c41dcfd
Use service config
bolkedebruin Oct 19, 2023
42908de
remove s3fs when testing aws
bolkedebruin Oct 19, 2023
94ec2e3
Make sure prod can build
bolkedebruin Oct 19, 2023
6a5de18
Fix tests to not depend on s3fs
bolkedebruin Oct 19, 2023
9d7fcce
Readd s3fs to setup.py
bolkedebruin Oct 19, 2023
8b3b194
Fix issues with docs
bolkedebruin Oct 20, 2023
caf2253
fix link
bolkedebruin Oct 20, 2023
841dbdc
Add example dag
bolkedebruin Oct 20, 2023
96516af
Optimize copy
bolkedebruin Oct 20, 2023
f085180
Extra
bolkedebruin Oct 20, 2023
6dc8c1a
Regen images
bolkedebruin Oct 20, 2023
73a9f57
Fix docs
bolkedebruin Oct 21, 2023
74ae073
Update tests not to be dependent on s3fs
bolkedebruin Oct 21, 2023
b2fd6fc
Moved example test
bolkedebruin Oct 21, 2023
6dae720
Add stat_result as a way to unify info and traditional stat_result
bolkedebruin Oct 21, 2023
7b6a71e
Clean up
bolkedebruin Oct 21, 2023
1f125da
Add extra docs
bolkedebruin Oct 21, 2023
91e0e94
Fix docs
bolkedebruin Oct 21, 2023
acf81f5
Add words
bolkedebruin Oct 21, 2023
671924c
Improve copying
bolkedebruin Oct 22, 2023
b1a4cbb
Upgrade fsspec and relax aiobotocore requirements
bolkedebruin Oct 22, 2023
4dea223
Update docs/apache-airflow/core-concepts/objectstorage.rst
bolkedebruin Oct 21, 2023
002de89
Update docs/apache-airflow/core-concepts/objectstorage.rst
bolkedebruin Oct 21, 2023
159c908
Revert "Upgrade fsspec and relax aiobotocore requirements"
bolkedebruin Oct 22, 2023
958eb51
Relax aiobotocore
bolkedebruin Oct 22, 2023
ec870c6
Make copy work as expected
bolkedebruin Oct 23, 2023
a58000d
Revert "Relax aiobotocore"
bolkedebruin Oct 23, 2023
b217eea
Make rename work only within same store
bolkedebruin Oct 23, 2023
855485a
Fix tests
bolkedebruin Oct 23, 2023
d07d044
Fix test not te reuse alias
bolkedebruin Oct 23, 2023
ed2f928
Ensure templated fields for xcom
bolkedebruin Oct 23, 2023
cb8e4a4
Improve handling of existing directories
bolkedebruin Oct 23, 2023
980c3ee
Set aiobotocore to 2.7.0
bolkedebruin Oct 23, 2023
1f26001
Allow larger versions of aiobotocore
bolkedebruin Oct 23, 2023
8f8912b
Update airflow/providers/amazon/aws/fs/s3.py
bolkedebruin Oct 24, 2023
d7fc935
Add tests for s3fs
bolkedebruin Oct 24, 2023
1af0dc3
Add example dag
bolkedebruin Oct 24, 2023
836131d
Improve example
bolkedebruin Oct 24, 2023
2e1cba6
Improve example
bolkedebruin Oct 24, 2023
88b9216
Add tutorial and improve docs
bolkedebruin Oct 27, 2023
5099f45
Add extra
bolkedebruin Oct 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ body:
- celery
- cloudant
- cncf-kubernetes
- common-io
- common-sql
- daskexecutor
- databricks
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ jobs:
# pip download --no-deps --dest dist apache-airflow-providers-<PROVIDER>==3.1.0
#
rm -vf dist/apache_airflow_providers_openlineage*.whl
rm -rf dist/apache_airflow_providers_common_io*.whl
- name: "Get all provider extras as AIRFLOW_EXTRAS env variable"
# Extras might be different on S3 so rather than relying on "all" we should get the list of
# packages to be installed from the current provider_dependencies.json file
Expand Down
19 changes: 10 additions & 9 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -671,15 +671,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE

Provider packages
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ if [[ ${UPGRADE_BOTO=} == "true" ]]; then
echo
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}"
echo
pip uninstall --root-user-action ignore aiobotocore -y || true
pip uninstall --root-user-action ignore aiobotocore s3fs -y || true
pip install --root-user-action ignore --upgrade boto3 botocore
pip check
fi
Expand Down Expand Up @@ -1468,7 +1468,7 @@ RUN echo "Airflow version: ${AIRFLOW_VERSION}"
# Without grpcio-status limit, pip gets into very long backtracking
# We should attempt to remove it in the future
#
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0"
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0 aiobotocore>=2.7.0"
ARG UPGRADE_TO_NEWER_DEPENDENCIES="false"
ARG VERSION_SUFFIX_FOR_PYPI=""

Expand Down
19 changes: 10 additions & 9 deletions INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE

# For installing Airflow in development environments - see CONTRIBUTING.rst
Expand Down
135 changes: 135 additions & 0 deletions airflow/example_dags/tutorial_objectstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

# [START tutorial]
# [START import_module]
import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.store.path import ObjectStoragePath

# [END import_module]

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}

# [START create_object_storage_path]
base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
# [END create_object_storage_path]


# [START instantiate_dag]
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
# [END instantiate_dag]
import duckdb
import pandas as pd

# [START get_air_quality_data]
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]

params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}

response = requests.get(API, params=params)
response.raise_for_status()

# ensure the bucket exists
base.mkdir(exists_ok=True)

formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"

df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)

return path

# [END get_air_quality_data]

# [START analyze]
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

print(df2.head())

# [END analyze]

# [START main_flow]
obj_path = get_air_quality_data()
analyze(obj_path)
# [END main_flow]


# [START dag_invocation]
tutorial_objectstorage()
# [END dag_invocation]
# [END tutorial]
90 changes: 90 additions & 0 deletions airflow/io/__init__.py
bolkedebruin marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from typing import (
TYPE_CHECKING,
Callable,
)

from fsspec.implementations.local import LocalFileSystem

from airflow.compat.functools import cache
from airflow.providers_manager import ProvidersManager
from airflow.stats import Stats
from airflow.utils.module_loading import import_string

if TYPE_CHECKING:
from fsspec import AbstractFileSystem

log = logging.getLogger(__name__)


def _file(_: str | None) -> LocalFileSystem:
return LocalFileSystem()


# builtin supported filesystems
_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = {
"file": _file,
}


@cache
def _register_filesystems() -> dict[str, Callable[[str | None], AbstractFileSystem]]:
bolkedebruin marked this conversation as resolved.
Show resolved Hide resolved
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
with Stats.timer("airflow.io.load_filesystems") as timer:
manager = ProvidersManager()
for fs_module_name in manager.filesystem_module_names:
fs_module = import_string(fs_module_name)
for scheme in getattr(fs_module, "schemes", []):
if scheme in scheme_to_fs:
log.warning("Overriding scheme %s for %s", scheme, fs_module_name)

method = getattr(fs_module, "get_fs", None)
if method is None:
raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method")
scheme_to_fs[scheme] = method

log.debug("loading filesystems from providers took %.3f seconds", timer.duration)
return scheme_to_fs


def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem:
"""
Get a filesystem by scheme.

:param scheme: the scheme to get the filesystem for
:return: the filesystem method
:param conn_id: the airflow connection id to use
"""
filesystems = _register_filesystems()
try:
return filesystems[scheme](conn_id)
except KeyError:
raise ValueError(f"No filesystem registered for scheme {scheme}")


def has_fs(scheme: str) -> bool:
"""
Check if a filesystem is available for a scheme.

:param scheme: the scheme to check
:return: True if a filesystem is available for the scheme
"""
return scheme in _register_filesystems()
Loading
Loading