Skip to content

Commit

Permalink
Add tutorial and improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Oct 27, 2023
1 parent 2e1cba6 commit 88b9216
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/example_dags/tutorial_objectstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
"TRSC_PT1H_avg": "float64",
}

# [START create_object_storage_path]
base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
# [END create_object_storage_path]


# [START instantiate_dag]
Expand All @@ -58,7 +60,7 @@ def tutorial_objectstorage():
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_objectstorage_api.html)
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
# [END instantiate_dag]
import duckdb
Expand Down Expand Up @@ -104,6 +106,7 @@ def get_air_quality_data(**kwargs) -> ObjectStoragePath:

# [END get_air_quality_data]

# [START analyze]
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
Expand All @@ -118,6 +121,8 @@ def analyze(path: ObjectStoragePath, **kwargs):

print(df2.head())

# [END analyze]

# [START main_flow]
obj_path = get_air_quality_data()
analyze(obj_path)
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/tutorial/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Once you have Airflow up and running with the :doc:`/start`, these tutorials are
fundamentals
taskflow
pipeline
objectstorage
118 changes: 118 additions & 0 deletions docs/apache-airflow/tutorial/objectstorage.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Object Storage
==============

This tutorial shows how to use the Object Storage API to manage objects that
reside on object storage, like S3, gcs and azure blob storage. The API is introduced
as part of Airflow 2.8.

The tutorial covers a simple pattern that is often used in data engineering and
data science workflows: accessing a web api, saving and analyzing the result. For the
tutorial to work you will need to have Duck DB installed, which is a in-process
analytical database. You can do this by running ``pip install duckdb``. The tutorial
makes use of S3 Object Storage. This requires that the amazon provider is installed
including ``s3fs`` by running ``pip install apache-airflow-providers-amazon[s3fs]``.
If you would like to use a different storage provider, you can do so by changing the
url in the ``create_object_storage_path`` function to the appropriate url for your
provider, for example by replacing ``s3://`` with ``gs://`` for Google Cloud Storage.
You will also need the right provider to be installed then. Finally, you will need
``pandas``, which can be installed by running ``pip install pandas``.


Creating an ObjectStoragePath
-----------------------------

The ObjectStoragePath is a path-like object that represents a path on object storage.
It is the fundamental building block of the Object Storage API.

.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START create_object_storage_path]
:end-before: [END create_object_storage_path]

The ObjectStoragePath constructor can take an optional connection id. If supplied
it will use the connection to obtain the right credentials to access the backend.
Otherwise it will revert to the default for that backend.

It is safe to instantiate an ObjectStoragePath at the root of your DAG. Connections
will not be created until the path is used. This means that you can create the
path in the global scope of your DAG and use it in multiple tasks.

Saving data to Object Storage
-----------------------------

An ObjectStoragePath behaves mostly like a pathlib.Path object. You can
use it to save and load data directly to and from object storage. So, a typical
flow could look like this:

.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START get_air_quality_data]
:end-before: [END get_air_quality_data]

The ``get_air_quality_data`` calls the API of the Finnish Meteorological Institute
to obtain the air quality data for the region of Helsinki. It creates a
Pandas DataFrame from the resulting json. It then saves the data to object storage
and converts it on the fly to parquet.

The key of the object is automatically generated from the logical date of the task,
so we could run this everyday and it would create a new object for each day. We
concatenate this key with the base path to create the full path to the object. Finally,
after writing the object to storage, we return the path to the object. This allows
us to use the path in the next task.

Analyzing the data
------------------

In understanding the data, you typically want to analyze it. Duck DB is a great
tool for this. It is an in-process analytical database that allows you to run
SQL queries on data in memory.

Because the data is already in parquet format, we can use the ``read_parquet`` and
because both Duck DB and the ObjectStoragePath use ``fsspec`` we can register the
backend of the ObjectStoragePath with Duck DB. ObjectStoragePath exposes the ``fs``
property for this. We can then use the ``register_filesystem`` function from Duck DB
to register the backend with Duck DB.

In Duck DB we can then create a table from the data and run a query on it. The
query is returned as a dataframe, which could be used for further analysis or
saved to object storage.

.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START analyze]
:end-before: [END analyze]

You might note that the ``analyze`` function does not know the original
path to the object, but that it is passed in as a parameter and obtained
through XCom. You do not need to re-instantiate the Path object. Also
the connection details are handled transparently.

Putting it all together
-----------------------

The final DAG looks like this, which wraps things so that we can run it:

.. exampleinclude:: /../../airflow/example_dags/tutorial_objectstorage.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]

0 comments on commit 88b9216

Please sign in to comment.