Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-58: Add Airflow ObjectStore (AFS) #34729

Merged
merged 83 commits into from
Oct 27, 2023
Merged

Conversation

bolkedebruin
Copy link
Contributor

@bolkedebruin bolkedebruin commented Oct 3, 2023

(Link to AIP-58 for convenience. — @uranusjr)

updated: 2023-10-17

This adds Airflow Store, which is a generic abstraction on top of a filesystem or object storage. It's inspired by databricks fs (dbfs), partially ported from Apache Iceberg and based on fsspec. The idea is that you can do the following

from airflow.io.store.path import ObjectStoragePath

base = ObjectStoragePath("s3://warehouse", conn_id="aws_default")

@dag
def my_dag:

  @task
  def load_file(src):
     with src.open("rb") as f:
       return f.read()

  x = load_file(base / "my_data.csv")


# you can use it the same way in an operator

Standard python file operations work (e.g. see shutil.copyfileobj in the generic FileTransfer Operator).

As this is based on fsspec, any objectstorage or filesystem implementing fsspec is supported. For now s3, gcs, adl and local file systems are supported. There are options for e.g. webhdfs.

As fsspec brings a key/value mapper it can be used as a XCom backend in the future. It can also work as a backend for DAGs, so we are not necessarily dependent on git-sync or the likes any more.

The astro-sdk abstractions, like load_file could be build on top.

This is work in progress. An Apache Arrow extension is forthcoming. The intention is to gather feedback :-).

Screenshot 2023-10-03 at 18 33 03

@potiuk @dimberman @ashb @kaxil @BasPH

@boring-cyborg boring-cyborg bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Oct 3, 2023
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design wise: I'm not sure about the fs.mount affecting global state -- could we instead do something like this:

s3fs = afs.mount("s3://warehouse", "/mnt/warehouse", conn_id="my_aws")
localfs = afs.mount("file:///tmp", "/mnt/tmp")

@task
def load_file(src):
   with s3fs.open(src) as f:
     f.read()

Edit: clearly we "could", so I guess I'm asking what was your thinking for choosing the current way?

Implementation wise I think the _adlfs etc methods should live in the azure etc. provider -- that way new providers can add in their own implementation.

@ashb ashb marked this pull request as draft October 3, 2023 15:21
@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 3, 2023

Design wise: I'm not sure about the fs.mount affecting global state -- could we instead do something like this:

s3fs = afs.mount("s3://warehouse", "/mnt/warehouse", conn_id="my_aws")
localfs = afs.mount("file:///tmp", "/mnt/tmp")

@task
def load_file(src):
   with s3fs.open(src) as f:
     f.read()

Edit: clearly we "could", so I guess I'm asking what was your thinking for choosing the current way?

Implementation wise I think the _adlfs etc methods should live in the azure etc. provider -- that way new providers can add in their own implementation.

Can you explain what your intention for local would be? The idea that I have is to have a global scope and a local scope within a task. So if you mount something within a task it doesn't propagate (by default?) to downstream tasks. However, global mounts would be available across all tasks.

The global scope in my mind is useful when you intend to manipulate some files across your tasks and this allows for easy CI/CD for example. Note btw that you can do this:

afs.mount("s3://warehouse", "/mnt/s3")
afs.mount("gcs://storage", "/mnt/gcs")

afs.copy("/mnt/s3/data", "/mnt/gcs")

and it will figure how to deal with the copy properly behind the scenes. Thus streaming when required (X -> Y) or a local copy (X -> X).

edit: as mentioned above, indeed provider specific code should be moved to their respective provider packages.

edit2: Looked at your code again. What is the benefit of having localfs and s3fs separately? That would kind of defeat the purpose of having mount points and you would need to handle all cross filesystem issues yourself?

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to having this great feature!

We had a good experience with pluggable components (task log handler, executor, secret manager, ...), I wonder if you can focus in the PR on the base classes based on local files, and add a configuration to change the fs plugin, then you can create the different other FS classes and the configurations in their corresponding providers.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 4, 2023

Looking forward to having this great feature!

We had a good experience with pluggable components (task log handler, executor, secret manager, ...), I wonder if you can focus in the PR on the base classes based on local files, and add a configuration to change the fs plugin, then you can create the different other FS classes and the configurations in their corresponding providers.

This is, more or less, already the case. I intend to have a discovery mechanism that picks up provider delivered filesystem abstractions. Local filesystem is supported by default. So to good news is: yes it will be pluggable.

@uranusjr
Copy link
Member

uranusjr commented Oct 5, 2023

Is the /mnt path necessary? I wonder if we should instead invisibly mount the fs to somewhere random by default and combine the concept with, say, Dataset, to help abstract away the URL string. This might take some cognative load off the users like having a global pathlib.Path allows writing relative paths.

Also one problem I have to get my head around this is the example in the top post isn’t very complete. For example, what is src in the @task example supposed to be? Why do we need to explicitly mount, instead of done automatically by afs.open?

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 5, 2023

@uranusjr

Is the /mnt path necessary? I wonder if we should instead invisibly mount the fs to somewhere random by default and combine the concept with, say, Dataset, to help abstract away the URL string. This might take some cognative load off the users like having a global pathlib.Path allows writing relative paths.

The '/mnt/' path isn't required. It's arbitrary - you could do '/whatever', except that (for now) it cannot be nested within another mount point. It should also be path-like. I'm not sure if mounting somewhere random makes sense (what's the point), but what could be done is to have the functions check for absolute paths, like s3://warehouse or file:///etc, so that they work with a.o. afs.open() as well. That would make sense. The downside of that is that conn_id should be accepted with each and every method, which makes it non standard (i.e. not compliant with the fsspec).

Edit: O, and can you add how you would like to see this function with Dataset - cause I think I get it a bit but I cannot entirely envision yet.

Also one problem I have to get my head around this is the example in the top post isn’t very complete. For example, what is src in the @task example supposed to be? Why do we need to explicitly mount, instead of done automatically by afs.open?

src can be any path that is prefixed with a mount point. In the top post it could be src = '/mnt/warehouse/data.gz'. The idea behind having mount points is that it becomes much easier to test and to separate operations from development. Your code can remain the same throughout and you would just would need to adjust the source of the mount to make it work in a different setting.

afs.mount("s3://warehouse", "/warehouse")

# can become

afs.mount("file:://tmp/warehouse", "/warehouse", remount=True)

# code below remains the same

As mentioned above it does make sense to have afs.open accept absolute paths as well so you can work with a different pattern.

@uranusjr
Copy link
Member

uranusjr commented Oct 5, 2023

What I’m envisioning is something like

warehouse_mnt = afs.mount("s3://warehouse")  # Can have conn_id too, it’s orthogontal.

@dag
def my_dag:

  @task
  def load_file(src):
     with afs.open(src) as f:
       f.read()

  load_file(warehouse_mnt / "my_data.csv")

instead of exposing the mount to the user, we encapsulate the data inside the Mount object and expose a Path-like interface to let the user operate on it directly. You can work with the mount directly as well, either by passing a mount point explicitly to mount or by accessing mnt.mount_location (or whatever, returns the location as as string) and work with that.

The Dataset part I’m thinking now is pretty simple, just make the Mount object inherit from Dataset (or is Dataset?) so that object can be used for both purposes without duplicating the URL if you need that. Not that useful but the two are really the same idea (a reference to some resource) that I feel shouldn’t be two things.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 5, 2023

@uranusjr

What I’m envisioning is something like

warehouse_mnt = afs.mount("s3://warehouse")  # Can have conn_id too, it’s orthogontal.

@dag
def my_dag:

  @task
  def load_file(src):
     with afs.open(src) as f:
       f.read()

  load_file(warehouse_mnt / "my_data.csv")

instead of exposing the mount to the user, we encapsulate the data inside the Mount object and expose a Path-like interface to let the user operate on it directly. You can work with the mount directly as well, either by passing a mount point explicitly to mount or by accessing mnt.mount_location (or whatever, returns the location as as string) and work with that.

I think I like that. If we can use both patterns that would be pretty cool.

The Dataset part I’m thinking now is pretty simple, just make the Mount object inherit from Dataset (or is Dataset?) so that object can be used for both purposes without duplicating the URL if you need that. Not that useful but the two are really the same idea (a reference to some resource) that I feel shouldn’t be two things.

Not entirely sure about this. To me, for now, both are quite different. A dataset points to data and a mount provides an interface that allows you to manipulate file like objects. So not really a reference to a resource imho. But maybe I am seeing that wrongly. If you have an example how you think that would work on the user side it would help.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 5, 2023

@uranusjr PTL - both patterns are now supported, @ashb I think your thoughts are now also addressed?

airflow/io/fs/__init__.py Outdated Show resolved Hide resolved
airflow/io/fs/__init__.py Outdated Show resolved Hide resolved
@uranusjr
Copy link
Member

uranusjr commented Oct 5, 2023

Not entirely sure about this. To me, for now, both are quite different. A dataset points to data and a mount provides an interface that allows you to manipulate file like objects. So not really a reference to a resource imho. But maybe I am seeing that wrongly. If you have an example how you think that would work on the user side it would help.

I see what you mean. The two do have some similarities though, say I want to trigger a(nother) DAG when a file on S3 is modified, I would write something like this:

inp = fs.mount("file://my-input.csv")
out = fs.mount("s3://my-warehouse/file.csv")

out_ds = Dataset("s3://my-warehouse/file.csv")

@task(outlets=[out_ds])
def upload(source, target):
    with fs.open(target "w"): as f:
        f.write(source.read())

upload(inp, out)

but the fact I need to manually tell Airflow explicitly what is done in the task seems a bit awkward, and it seems like Airflow should be able to just do the right thing. But admittedly I haven’t figured out how exactly Airflow should infer out should be triggered (but not inp!) so maybe that’s premature; we can always figure that out later.

@uranusjr
Copy link
Member

uranusjr commented Oct 5, 2023

Also, when do I call unmount? I kind of feel there’s really no good use case to have that function. Or maybe we should have some auto context management instead.

airflow/io/fs/__init__.py Outdated Show resolved Hide resolved
airflow/io/fs/__init__.py Outdated Show resolved Hide resolved
@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 5, 2023

@uranusjr

I see what you mean. The two do have some similarities though, say I want to trigger a(nother) DAG when a file on S3 is modified, I would write something like this:

inp = fs.mount("file://my-input.csv")
out = fs.mount("s3://my-warehouse/file.csv")

out_ds = Dataset("s3://my-warehouse/file.csv")

@task(outlets=[out_ds])
def upload(source, target):
    with fs.open(target "w"): as f:
        f.write(source.read())

upload(inp, out)

but the fact I need to manually tell Airflow explicitly what is done in the task seems a bit awkward, and it seems like Airflow should be able to just do the right thing. But admittedly I haven’t figured out how exactly Airflow should infer out should be triggered (but not inp!) so maybe that’s premature; we can always figure that out later.

There seems to be a misconception here. Typically, you would not mount a file, but a filesystem. Apart from setting aside that you could have something like a GzipFileSystem. So not file:///my-input.csv but more like file:///where/my-input/lives. In that case you would end up with:

local_fs = fs.mount("file:///data", "/data")
remote_fs = fs.mount("s3://my-warehouse", "/warehouse")

out_ds = Dataset(remote_fs / "file.csv")

@task(outlets=[out_ds])
def upload(source, target):
  with fs.open(target, "w"): as f:
    f.write(source.read())

# the more optimized version of this looks like this btw

@task(outlets=[out_ds])
def upload(source target):
  # fs.copy figures out the most efficient way of copying
  fs.copy(source, target)


upload(local_fs / "my-input.csv", remote_fs / "file.csv")

# or

upload("/data/my-input.csv", "/warehouse/file.csv")

Being able to deal with Dataset parameters, should maybe be on the list. Although I am not entirely happy with the nature of Datasets being defined at parse time instead of runtime.

@bolkedebruin
Copy link
Contributor Author

Also, when do I call unmount? I kind of feel there’s really no good use case to have that function. Or maybe we should have some auto context management instead.

There is no strong reason right now. It is there for compatibility reasons with filesystem specs. Maybe for cleaning up connections if required.

@uranusjr
Copy link
Member

uranusjr commented Oct 6, 2023

re: __truediv__ returning str. In pathlib, Path.__truediv__ returns another Path instance so you can do e.g. path / "foo" / "bar", which users would likely expect. I thought this should return another Mount object, but you pointed out the thing I got wrong, Mount does not represent a file endpoint, but more like a volume, so __truediv__ should return something of another type similar to Path instead. At this point I feel we should leave that part out of the initial implementation and focus on getting other things right (see below).

That makes sense.

re: unmount. I’m guessing, but perhaps this is similar to @ashb’s concern on global state. Currently it is not at all clear when the user can call cleanup, due to how @task functions are actually executed in separate workers, and how global variables in the DAG file work may not be obvious to users. I feel Airflow should either require a more explicit approach such as

m = Mount("s3://...")  # Does not register anything but simply creates an object in memory.

@task
def foo():
    with m:  # Actually registers the mount.

The intention is to also allow use outside of intentional DAG creation, so it becomes more easy to move from arbitrary code to a DAG, as mentioned it is inspired with by databricks fs, which does the same so it does come natural to people coming from such an environment. Trying to stay close to existing specifications rather than inventing our own is one of the principles behind the idea. So to me the above looks like a bit of an anti-pattern, considering POSIX implementations (e.g. the mount command)

To me it is more intuitive and pythonic to use

afs.mount("s3://", "/warehouse")

# rather than

warehouse = afs.mount("s3://")

afs.ls("/warehouse")

# or

warehouse.ls("/")

# rather than:
with m:
  m.ls("/")

or be magically smarter about this and control the mount registration only inside a particular task context instead. We can draw inspiration from other platforms, such as how Pytest uses argument names to infer fixtures (which have a similar context problem), or how Dagster has @asset(deps=...) to define the task needs certain resources in the context.

This is, or should be, already the case. A mount isn't much more (or at least it shouldn't) than creating an object in memory. So to be clear (and I know it isn't entirely the case now due to underlying implementations) - mounting should'nt do anything (like setting up a connection) until you try to execute something on top if it. If you create it at the DAG-file level is just becomes available within every task and if you create it within a task it is just available within that task. So:

# globally available across all tasks
afs.mount("s3://", "/warehouse")

@task
def task1():
  # only available within the task
  local_fs = afs.mount("file:///tmp", "/local_fs")

@task
def task2():
  # cannot use "local_fs"
  ...

Finally, I think this needs an AIP to document the various design decisions and better describe the high-level view.

I'm okay with doing that, after most of the practical details have been discovered and fleshed out. Otherwise we would be mostly arguing on paper. In other words, I think our discussion here forms the basis for such AIP.

I think we seem to be quite aligned, we just seem to have a preference for a different kind of pattern. That's perfectly fine and nothing standing in the way to have all these patterns supported. The unmounting feature seems to be bit confusing which I agree with in a DAG context. In a pre-DAG context (e.g. notebook style development) it might make more sense, particularly if it is required to clean-up connections which might be somewhat expensive operations wise (say for example in a hypothetical MySQLFileSystem). Then either a context manager makes sense or having the task record what mounts were active at the start and close any the have been added during execution just to make sure that they do not linger.

@bolkedebruin
Copy link
Contributor Author

@uranusjr sorry I see now that I accidentally edited your comment :-P

@bolkedebruin
Copy link
Contributor Author

@uranusjr context manager included now so you can do:

with afs.mount("s3://warehouse") as mnt:
  mnt.copy(x, y)

and (Pure)Path interface:

mnt = afs.mount("s3://warehouse")

print (mnt / "data" / "cool" / "stuff")

@bolkedebruin bolkedebruin changed the title WIP: Add Airflow FS AIP-58: Add Airflow FS Oct 8, 2023
@bolkedebruin bolkedebruin marked this pull request as ready for review October 8, 2023 10:10
@bolkedebruin bolkedebruin changed the title AIP-58: Add Airflow FS AIP-58: Add Airflow VFS Oct 9, 2023
@jscheffl
Copy link
Contributor

jscheffl commented Oct 9, 2023

I was reading the AIP doc in general and feel like revising the PR in parallel is making the dicsussion a bit tooo complex. As I added a few comments I'm just adding a few here and would favor to have the AIP settled first before code and AIP (=concepts) are discussed in parallel - feels a bit of in-consistency.

When reading the comments I feel stronger in my opinion that the concept of a "mount" from the term really is a bit mis-placed. As also commented in the AIP this gived the feeling to users that really something on OS level is being made. Seeing the code examples by @uranusjr I would really favor in (1) ising the Pythonic way of context managers and drop the ideas of mounting (is also not needed to mount in pathlib.Path) but just operate on some abstract file system object. For me there is actually no real need to introduce new mounting concepts if we could use the well - long existing - connection facility in Airflow. Connections are a very good place to abstract some backend storage endpoints away from DAG code and you could easily use existing provider facilities to have a with Connection(MY_S2_ENDPOINT).filespec(): ...or so.

Not that it sounds dis-couraging, I very much like the IO library idea in order to reduce the XtoY Operators but I fear adding a total new FS/IO concept w/o integration of existing Connection/Dataset/Provider core concepts I feel this is not having a consistent product smell.

Therefore I'd favor to first compare (general) options of FS concept integrations, pro(s) and con(s), compare them (maybe with example DAG code) before having a code level discussion which is hard to follow.

@bolkedebruin
Copy link
Contributor Author

bolkedebruin commented Oct 10, 2023

@jens-scheffler-bosch AFS works on a fundamentally different layer than Datasets do. Datasets are currently just metadata pointing to arbitrary data and are treated as such in Airflow. They are also only evaluated at parsing time, which is a major shortcoming in my opinion. AFS gives a consistent generic API on top of storage. While that might look the same at first it or closely related, it is not when you consider that Datasets could point to Tables which are a structured way of representing a consistent set of data which in turn are backed-up by a (set of) file(s) in a filesystem residing on storage. As mentioned on the AIP I think that indeed that bringing Datasets closer to AFS makes sense, but I also think it is out-of-scope of this AIP. More a "for further research" kind of thing than something to be included now.

On mount points, I am willing to consider to let it go, but I think it is also too early now. In the arguments against it I have only seen "it is confusing for users" where users are DAG authors only. As mentioned AFS targets DAGs, but also XCom and DAG processing. That perspective seems not to be included when arguing against using mount points. Then on the content level, there are other examples that use a Virtual File System (as AFS is). The primary that comes to mind is Databricks FS (dbfs), which does use mount points virtually. It seems not to be confusing for their users so 'user data' is not necessarily pointing towards 'confusing'. Another reason, is to provide efficiency in cross file system operations, so for example s3 to gcs, or same file system operations, like s3 to s3, without requiring the user to deal with that. Again, this is not considered when arguing against mount points. Finally, using mount points allows users not to think of Airflow's complexity. By just changing the mount of a mount point your code operates on local storage, s3 or gcs.

I am not sure if you read the PR correctly, as it already supports Connections and Providers. gs, adls, s3 are filesystems obtained from their respective Providers and Connections are used if provided or obtained from the environment. Which is how Operators in general should, not all of them do, behave. Even if solely using Paths for dealing with file operations, we need to store connection details somewhere. This needs to be done centrally in order to be able to reuse connections and for efficiency. This could be done in a cache without exposing it to the user or have it exposed though something like mount points so they can be referenced by the user.

I intend to keep the PR updated. There are some disadvantages, but it keeps the discussion grounded and allows people to play and experiment with it as well. Otherwise we will just be doing a paper exercise on pros and cons no one has practical experience with. DAG code is in this PR, although it is not even required to be in a DAG(!). It works the same across all, which was the intention.

Note that 'Path' and 'mount points' can coexist. They are not mutually exclusive. In fact as mentioned above a Path interface requires a central registration. Whether or not that is exposed is up for discussion.

For this PR what I intend to do is:

  1. Refactor so that my_path = afs.Path("s3://warehouse") returns a Path interface and not a Mount.
  2. Make initalization of Filesystems happen at 'first' operations so they become lazily initialized (some underlying implementations fully connect)

@bolkedebruin
Copy link
Contributor Author

can I have a (final) review @potiuk @Taragolis @uranusjr ?

Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned about the serialization of the providers ObjectStores, but checking the documentation, it looks like all the fsspec objects are safe to serialize.

Great work! It looks good to me, LGTM.

@hussein-awala hussein-awala added the type:new-feature Changelog: New Features label Oct 27, 2023
@hussein-awala hussein-awala added this to the Airflow 2.8.0 milestone Oct 27, 2023
@hussein-awala
Copy link
Member

I added it to milestone 2.8.0 since the vote can be closed and already has the needed binding votes.

@bolkedebruin bolkedebruin merged commit 04e2fbd into apache:main Oct 27, 2023
65 of 67 checks passed
potiuk added a commit to potiuk/airflow that referenced this pull request Oct 27, 2023
The apache#34729 has been merged without waiting for static checks and broke
the images regenerated.
@bolkedebruin bolkedebruin deleted the fileio branch October 29, 2023 07:22
potiuk added a commit that referenced this pull request Oct 29, 2023
The CI scripts in `build-image` workflow are taken from main so when
you add new (preinstalled) provider to be installed for PROD image,
the provider added in the PR in the file is not present in the
build-image workflow. Moving the file to airflow/providers is
safe because it does not contain any code - just list of packages
that breeze command will use when building the PROD image.

Related to #34729

(cherry picked from commit eb76276)
potiuk added a commit that referenced this pull request Oct 29, 2023
…35191)

The #35099 switched installation of the local venv for Airflow to
released packages, in order to accomodate for case from #34729 where
we locally install a new provider that is preinstalled but has not
yet been released.

This however has the side-effect - some k8s tests are using K8S Pod
operator to run the tests and if this operator has been changed, the
local - modified - version of it is not going to be used for tests.

Another side effect is that in case of a new installation, when the
constraint installation does not work (for example constraints in
main are conflicting with released airflow), `pip` starts to
backtrack - and for conflicting requirements it produces strange
results (for example it attempts to install airflow 1.10 and fails
due to "slugify" dependency.

The previous version of it had another problem - once installed,
it had not changed, so in case someone used breeze to run k8s tests
locally and iterated over changes in K8SPod Operator, only the
version from the moment the k8s environment was installed was used.

Both cases are now handled better:

* INSTALL_PROVIDERS_FROM_SOURCES is used as env variable to make
  sure new providers (including preinstalled providers) are found
  in Airfow sources, not in PyPI
* The "." is used back in order to install Airflow but also the
  -e (editable) installation is used for it - so that any changes
  to local version of k8s are used. We are not using constraints
  for installation.
* Dry run and verbose mode of installing the venv are improved
  a bit - they show better what's going on (and dry_run does
  not interact with the installation in unexpected ways - deleting
  the installed venv without recreating it).

We already handled a change that k8s test environment has been
reinstalled after the requirements file changed and caching in CI
includes the hash of the requirements file as content - so we do not
need to handle reinstallation of the venv or caching in CI. The
venv should be appropriately reinstalled as needed.

(cherry picked from commit ba28334)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-58 area:core-operators Operators, Sensors and hooks within Core Airflow type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.