-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-58: Add Airflow ObjectStore (AFS) #34729
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Design wise: I'm not sure about the fs.mount
affecting global state -- could we instead do something like this:
s3fs = afs.mount("s3://warehouse", "/mnt/warehouse", conn_id="my_aws")
localfs = afs.mount("file:///tmp", "/mnt/tmp")
@task
def load_file(src):
with s3fs.open(src) as f:
f.read()
Edit: clearly we "could", so I guess I'm asking what was your thinking for choosing the current way?
Implementation wise I think the _adlfs
etc methods should live in the azure etc. provider -- that way new providers can add in their own implementation.
Can you explain what your intention for local would be? The idea that I have is to have a global scope and a local scope within a task. So if you mount something within a task it doesn't propagate (by default?) to downstream tasks. However, global mounts would be available across all tasks. The global scope in my mind is useful when you intend to manipulate some files across your tasks and this allows for easy CI/CD for example. Note btw that you can do this: afs.mount("s3://warehouse", "/mnt/s3")
afs.mount("gcs://storage", "/mnt/gcs")
afs.copy("/mnt/s3/data", "/mnt/gcs") and it will figure how to deal with the copy properly behind the scenes. Thus streaming when required (X -> Y) or a local copy (X -> X). edit: as mentioned above, indeed provider specific code should be moved to their respective provider packages. edit2: Looked at your code again. What is the benefit of having |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to having this great feature!
We had a good experience with pluggable components (task log handler, executor, secret manager, ...), I wonder if you can focus in the PR on the base classes based on local files, and add a configuration to change the fs plugin, then you can create the different other FS classes and the configurations in their corresponding providers.
This is, more or less, already the case. I intend to have a discovery mechanism that picks up provider delivered filesystem abstractions. Local filesystem is supported by default. So to good news is: yes it will be pluggable. |
Is the Also one problem I have to get my head around this is the example in the top post isn’t very complete. For example, what is |
The '/mnt/' path isn't required. It's arbitrary - you could do '/whatever', except that (for now) it cannot be nested within another mount point. It should also be path-like. I'm not sure if mounting somewhere random makes sense (what's the point), but what could be done is to have the functions check for absolute paths, like Edit: O, and can you add how you would like to see this function with Dataset - cause I think I get it a bit but I cannot entirely envision yet.
afs.mount("s3://warehouse", "/warehouse")
# can become
afs.mount("file:://tmp/warehouse", "/warehouse", remount=True)
# code below remains the same As mentioned above it does make sense to have |
What I’m envisioning is something like warehouse_mnt = afs.mount("s3://warehouse") # Can have conn_id too, it’s orthogontal.
@dag
def my_dag:
@task
def load_file(src):
with afs.open(src) as f:
f.read()
load_file(warehouse_mnt / "my_data.csv") instead of exposing the mount to the user, we encapsulate the data inside the Mount object and expose a Path-like interface to let the user operate on it directly. You can work with the mount directly as well, either by passing a mount point explicitly to The Dataset part I’m thinking now is pretty simple, just make the Mount object inherit from Dataset (or is Dataset?) so that object can be used for both purposes without duplicating the URL if you need that. Not that useful but the two are really the same idea (a reference to some resource) that I feel shouldn’t be two things. |
I think I like that. If we can use both patterns that would be pretty cool.
Not entirely sure about this. To me, for now, both are quite different. A dataset points to data and a mount provides an interface that allows you to manipulate file like objects. So not really a reference to a resource imho. But maybe I am seeing that wrongly. If you have an example how you think that would work on the user side it would help. |
I see what you mean. The two do have some similarities though, say I want to trigger a(nother) DAG when a file on S3 is modified, I would write something like this: inp = fs.mount("file://my-input.csv")
out = fs.mount("s3://my-warehouse/file.csv")
out_ds = Dataset("s3://my-warehouse/file.csv")
@task(outlets=[out_ds])
def upload(source, target):
with fs.open(target "w"): as f:
f.write(source.read())
upload(inp, out) but the fact I need to manually tell Airflow explicitly what is done in the task seems a bit awkward, and it seems like Airflow should be able to just do the right thing. But admittedly I haven’t figured out how exactly Airflow should infer |
Also, when do I call unmount? I kind of feel there’s really no good use case to have that function. Or maybe we should have some auto context management instead. |
There seems to be a misconception here. Typically, you would not mount a file, but a filesystem. Apart from setting aside that you could have something like a local_fs = fs.mount("file:///data", "/data")
remote_fs = fs.mount("s3://my-warehouse", "/warehouse")
out_ds = Dataset(remote_fs / "file.csv")
@task(outlets=[out_ds])
def upload(source, target):
with fs.open(target, "w"): as f:
f.write(source.read())
# the more optimized version of this looks like this btw
@task(outlets=[out_ds])
def upload(source target):
# fs.copy figures out the most efficient way of copying
fs.copy(source, target)
upload(local_fs / "my-input.csv", remote_fs / "file.csv")
# or
upload("/data/my-input.csv", "/warehouse/file.csv") Being able to deal with Dataset parameters, should maybe be on the list. Although I am not entirely happy with the nature of Datasets being defined at parse time instead of runtime. |
There is no strong reason right now. It is there for compatibility reasons with filesystem specs. Maybe for cleaning up connections if required. |
That makes sense.
m = Mount("s3://...") # Does not register anything but simply creates an object in memory.
@task
def foo():
with m: # Actually registers the mount. The intention is to also allow use outside of intentional DAG creation, so it becomes more easy to move from arbitrary code to a DAG, as mentioned it is inspired with by databricks fs, which does the same so it does come natural to people coming from such an environment. Trying to stay close to existing specifications rather than inventing our own is one of the principles behind the idea. So to me the above looks like a bit of an anti-pattern, considering POSIX implementations (e.g. the mount command) To me it is more intuitive and pythonic to use afs.mount("s3://", "/warehouse")
# rather than
warehouse = afs.mount("s3://")
afs.ls("/warehouse")
# or
warehouse.ls("/")
# rather than:
with m:
m.ls("/")
This is, or should be, already the case. A mount isn't much more (or at least it shouldn't) than creating an object in memory. So to be clear (and I know it isn't entirely the case now due to underlying implementations) - mounting should'nt do anything (like setting up a connection) until you try to execute something on top if it. If you create it at the DAG-file level is just becomes available within every task and if you create it within a task it is just available within that task. So: # globally available across all tasks
afs.mount("s3://", "/warehouse")
@task
def task1():
# only available within the task
local_fs = afs.mount("file:///tmp", "/local_fs")
@task
def task2():
# cannot use "local_fs"
...
I'm okay with doing that, after most of the practical details have been discovered and fleshed out. Otherwise we would be mostly arguing on paper. In other words, I think our discussion here forms the basis for such AIP. I think we seem to be quite aligned, we just seem to have a preference for a different kind of pattern. That's perfectly fine and nothing standing in the way to have all these patterns supported. The unmounting feature seems to be bit confusing which I agree with in a DAG context. In a pre-DAG context (e.g. notebook style development) it might make more sense, particularly if it is required to clean-up connections which might be somewhat expensive operations wise (say for example in a hypothetical MySQLFileSystem). Then either a context manager makes sense or having the task record what mounts were active at the start and close any the have been added during execution just to make sure that they do not linger. |
@uranusjr sorry I see now that I accidentally edited your comment :-P |
@uranusjr context manager included now so you can do: with afs.mount("s3://warehouse") as mnt:
mnt.copy(x, y) and (Pure)Path interface: mnt = afs.mount("s3://warehouse")
print (mnt / "data" / "cool" / "stuff") |
I was reading the AIP doc in general and feel like revising the PR in parallel is making the dicsussion a bit tooo complex. As I added a few comments I'm just adding a few here and would favor to have the AIP settled first before code and AIP (=concepts) are discussed in parallel - feels a bit of in-consistency. When reading the comments I feel stronger in my opinion that the concept of a "mount" from the term really is a bit mis-placed. As also commented in the AIP this gived the feeling to users that really something on OS level is being made. Seeing the code examples by @uranusjr I would really favor in (1) ising the Pythonic way of context managers and drop the ideas of mounting (is also not needed to mount in Not that it sounds dis-couraging, I very much like the IO library idea in order to reduce the XtoY Operators but I fear adding a total new FS/IO concept w/o integration of existing Connection/Dataset/Provider core concepts I feel this is not having a consistent product smell. Therefore I'd favor to first compare (general) options of FS concept integrations, pro(s) and con(s), compare them (maybe with example DAG code) before having a code level discussion which is hard to follow. |
@jens-scheffler-bosch AFS works on a fundamentally different layer than Datasets do. Datasets are currently just metadata pointing to arbitrary data and are treated as such in Airflow. They are also only evaluated at parsing time, which is a major shortcoming in my opinion. AFS gives a consistent generic API on top of storage. While that might look the same at first it or closely related, it is not when you consider that Datasets could point to Tables which are a structured way of representing a consistent set of data which in turn are backed-up by a (set of) file(s) in a filesystem residing on storage. As mentioned on the AIP I think that indeed that bringing Datasets closer to AFS makes sense, but I also think it is out-of-scope of this AIP. More a "for further research" kind of thing than something to be included now. On mount points, I am willing to consider to let it go, but I think it is also too early now. In the arguments against it I have only seen "it is confusing for users" where users are DAG authors only. As mentioned AFS targets DAGs, but also XCom and DAG processing. That perspective seems not to be included when arguing against using mount points. Then on the content level, there are other examples that use a Virtual File System (as AFS is). The primary that comes to mind is Databricks FS (dbfs), which does use mount points virtually. It seems not to be confusing for their users so 'user data' is not necessarily pointing towards 'confusing'. Another reason, is to provide efficiency in cross file system operations, so for example s3 to gcs, or same file system operations, like s3 to s3, without requiring the user to deal with that. Again, this is not considered when arguing against mount points. Finally, using mount points allows users not to think of Airflow's complexity. By just changing the mount of a mount point your code operates on local storage, s3 or gcs. I am not sure if you read the PR correctly, as it already supports Connections and Providers. I intend to keep the PR updated. There are some disadvantages, but it keeps the discussion grounded and allows people to play and experiment with it as well. Otherwise we will just be doing a paper exercise on pros and cons no one has practical experience with. DAG code is in this PR, although it is not even required to be in a DAG(!). It works the same across all, which was the intention. Note that 'Path' and 'mount points' can coexist. They are not mutually exclusive. In fact as mentioned above a For this PR what I intend to do is:
|
Co-authored-by: Andrey Anshin <[email protected]>
can I have a (final) review @potiuk @Taragolis @uranusjr ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was concerned about the serialization of the providers ObjectStores, but checking the documentation, it looks like all the fsspec objects are safe to serialize.
Great work! It looks good to me, LGTM.
I added it to milestone 2.8.0 since the vote can be closed and already has the needed binding votes. |
The apache#34729 has been merged without waiting for static checks and broke the images regenerated.
The CI scripts in `build-image` workflow are taken from main so when you add new (preinstalled) provider to be installed for PROD image, the provider added in the PR in the file is not present in the build-image workflow. Moving the file to airflow/providers is safe because it does not contain any code - just list of packages that breeze command will use when building the PROD image. Related to #34729 (cherry picked from commit eb76276)
…35191) The #35099 switched installation of the local venv for Airflow to released packages, in order to accomodate for case from #34729 where we locally install a new provider that is preinstalled but has not yet been released. This however has the side-effect - some k8s tests are using K8S Pod operator to run the tests and if this operator has been changed, the local - modified - version of it is not going to be used for tests. Another side effect is that in case of a new installation, when the constraint installation does not work (for example constraints in main are conflicting with released airflow), `pip` starts to backtrack - and for conflicting requirements it produces strange results (for example it attempts to install airflow 1.10 and fails due to "slugify" dependency. The previous version of it had another problem - once installed, it had not changed, so in case someone used breeze to run k8s tests locally and iterated over changes in K8SPod Operator, only the version from the moment the k8s environment was installed was used. Both cases are now handled better: * INSTALL_PROVIDERS_FROM_SOURCES is used as env variable to make sure new providers (including preinstalled providers) are found in Airfow sources, not in PyPI * The "." is used back in order to install Airflow but also the -e (editable) installation is used for it - so that any changes to local version of k8s are used. We are not using constraints for installation. * Dry run and verbose mode of installing the venv are improved a bit - they show better what's going on (and dry_run does not interact with the installation in unexpected ways - deleting the installed venv without recreating it). We already handled a change that k8s test environment has been reinstalled after the requirements file changed and caching in CI includes the hash of the requirements file as content - so we do not need to handle reinstallation of the venv or caching in CI. The venv should be appropriately reinstalled as needed. (cherry picked from commit ba28334)
(Link to AIP-58 for convenience. — @uranusjr)
updated: 2023-10-17
This adds Airflow Store, which is a generic abstraction on top of a filesystem or object storage. It's inspired by databricks fs (dbfs), partially ported from Apache Iceberg and based on
fsspec
. The idea is that you can do the followingStandard python file operations work (e.g. see shutil.copyfileobj in the generic FileTransfer Operator).
As this is based on
fsspec
, any objectstorage or filesystem implementing fsspec is supported. For now s3, gcs, adl and local file systems are supported. There are options for e.g. webhdfs.As fsspec brings a key/value mapper it can be used as a XCom backend in the future. It can also work as a backend for DAGs, so we are not necessarily dependent on git-sync or the likes any more.
The astro-sdk abstractions, like
load_file
could be build on top.This is work in progress. An Apache Arrow extension is forthcoming. The intention is to gather feedback :-).
@potiuk @dimberman @ashb @kaxil @BasPH