Skip to content
This repository has been archived by the owner on Jun 2, 2022. It is now read-only.

Commit

Permalink
Address feedback and fix issues around tracking file size
Browse files Browse the repository at this point in the history
Add `sizeValid` to track whether we need to call `plugin.Size` before
using `size`. This is only needed when opening a file WriteOnly to delay
a Read - to ensure we have the entire file content before Write - so we
only do it when necessary.

Handle `io.EOF` when loading the file in case a call to `Setattr`
increased it beyond the original size but didn't write to fill that
space, and pad with null characters when that happens.

Upgrade journal entries to warnings when they result in an error. FUSE
doesn't relay the actual error messages.

Delay releasing data and clearing entry cache until we release all
writers.

Tweak a few tests so we have a balance of ReadWrite tests that rely on
attribute size or calling Read to determine size. Update
TestTruncateAndWrite to ensure we don't call Read when we've called
Setattr to truncate the file first.

Add additional comments explaining how to use BlockReadable and
Writable, and some code comments to clarify how Open works.

Signed-off-by: Michael Smith <[email protected]>
  • Loading branch information
MikaelSmith committed Dec 18, 2019
1 parent 33e8f02 commit 780630d
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 37 deletions.
86 changes: 58 additions & 28 deletions fuse/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type file struct {
data []byte
// Size for content during writing or with unspecified size attribute
size uint64
// Tracks whether we still need to query plugin.Size for the entry's size before using size
sizeValid bool
}

func newFile(p *dir, e plugin.Entry) *file {
Expand Down Expand Up @@ -78,7 +80,14 @@ func getFileMode(entry plugin.Entry) os.FileMode {

var _ = fs.NodeOpener(&file{})

// Open a file for reading or writing.
// Open an entry for reading or writing. Several patterns exist for how to interact with entries.
// - An entry that only supports Read can only be opened ReadOnly.
// - An entry that supports both Read and Write can be opened in any mode.
// - An entry that only supports Write can only be opened WriteOnly.
//
// When writing and flushing a file, we may call Read on the entry (if it supports Read) even if
// opened WriteOnly. That only happens when performing a partial write, or when its Size attribute
// isn't set and there are no calls to Setattr to define the file size.
func (f *file) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
f.mux.Lock()
defer f.mux.Unlock()
Expand All @@ -96,19 +105,18 @@ func (f *file) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR
writable := plugin.WriteAction().IsSupportedOn(f.entry)
switch {
case req.Flags.IsReadOnly() && !readable:
activity.Record(ctx, "FUSE: Open read-only unsupported on %v", f)
activity.Warnf(ctx, "FUSE: Open read-only unsupported on %v", f)
return nil, fuse.ENOTSUP
case req.Flags.IsWriteOnly() && !writable:
activity.Record(ctx, "FUSE: Open write-only unsupported on %v", f)
activity.Warnf(ctx, "FUSE: Open write-only unsupported on %v", f)
return nil, fuse.ENOTSUP
case req.Flags.IsReadWrite() && (!readable || !writable):
activity.Record(ctx, "FUSE: Open read-write unsupported on %v", f)
activity.Warnf(ctx, "FUSE: Open read-write unsupported on %v", f)
return nil, fuse.ENOTSUP
}

if req.Flags.IsReadOnly() || req.Flags.IsReadWrite() {
attr := plugin.Attributes(entry)
if !attr.HasSize() {
if attr := plugin.Attributes(entry); !attr.HasSize() {
// The entry's content size is unknown so open the file in direct IO mode. This enables FUSE
// to still read the entry's content so that built-in tools like cat and grep still work.
resp.Flags |= fuse.OpenDirectIO
Expand All @@ -119,15 +127,24 @@ func (f *file) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR
if err != nil {
return nil, err
}
} else {
f.size = attr.Size()
}

f.sizeValid = true
} else if !readable {
// Reported size is irrelevant for write-only entries.
f.sizeValid = true
}

return f, nil
}

var _ = fs.HandleReleaser(&file{})

func (f *file) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
if req.ReleaseFlags&fuse.ReleaseFlush != 0 {
activity.Record(ctx, "FUSE: Invoking Flush for Release on %v", f)
err := f.Flush(ctx, &fuse.FlushRequest{
Header: req.Header,
Handle: req.Handle,
Expand All @@ -138,6 +155,18 @@ func (f *file) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
}
}

if _, ok := f.writers[req.Handle]; ok {
delete(f.writers, req.Handle)

if len(f.writers) == 0 {
// If we just released the last writer, release the data buffer to conserve memory and
// invalidate cache on the entry and its parent so we get updated content and size on the
// next request. Leave size so we can use it for future attribute requests.
f.data = nil
plugin.ClearCacheFor(plugin.ID(f.entry), true)
}
}

activity.Record(ctx, "FUSE: Release %v: %+v", f, *req)
return nil
}
Expand Down Expand Up @@ -187,7 +216,7 @@ func (f *file) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri
if newLen := int(newLen); newLen > len(f.data) {
f.data = append(f.data, make([]byte, newLen-len(f.data))...)
}
// Write-only entries are assumed not to have a size.
// Size is irrelevant for write-only entries.
if plugin.ReadAction().IsSupportedOn(f.entry) && f.size < uint64(newLen) {
f.size = uint64(newLen)
}
Expand All @@ -199,7 +228,7 @@ func (f *file) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri

func (f *file) load(ctx context.Context, start, end int64) ([]byte, error) {
if !plugin.ReadAction().IsSupportedOn(f.entry) {
activity.Record(ctx, "FUSE: Non-contiguous writes (at %v) unsupported on %v", start, f)
activity.Warnf(ctx, "FUSE: Non-contiguous writes (at %v) unsupported on %v", start, f)
return nil, fuse.ENOTSUP
}

Expand All @@ -214,7 +243,6 @@ func (f *file) Flush(ctx context.Context, req *fuse.FlushRequest) error {
f.mux.Lock()
defer f.mux.Unlock()
activity.Record(ctx, "FUSE: Flush %v: %+v", f, *req)
var releasedWriter bool

// If this handle had an open writer, write current data.
if _, ok := f.writers[req.Handle]; ok {
Expand All @@ -224,28 +252,36 @@ func (f *file) Flush(ctx context.Context, req *fuse.FlushRequest) error {
panic("Size was not kept up-to-date with changes to data.")
}

if !f.sizeValid {
// If this is a readable entry that's opened WriteOnly and hasn't been truncated, we haven't
// determined its size. Get it now so we can buffer as needed.
size, err := plugin.Size(ctx, f.entry)
if err != nil {
return err
}
f.size = size
f.sizeValid = true
}

if uint64(dataLen) < f.size {
// Missing some data, load the remainder before writing.
data, err := f.load(ctx, dataLen, int64(f.size))
if err != nil {
if err != nil && err != io.EOF {
return err
}
f.data = append(f.data, data...)

// If still too small then something increased the size beyond the original.
// Fill with null characters.
if sz := uint64(len(f.data)); sz < f.size {
f.data = append(f.data, make([]byte, f.size-sz)...)
}
}

if err := plugin.WriteWithAnalytics(ctx, f.entry.(plugin.Writable), f.data); err != nil {
activity.Warnf(ctx, "FUSE: Error writing %v: %v", f, err)
return err
}
delete(f.writers, req.Handle)
releasedWriter = true
}

if len(f.writers) == 0 && releasedWriter {
// Ensure data is released. Leave size so we can use it for future attribute requests.
f.data = nil

// Invalidate cache on the entry and its parent. Need to get updated content and size.
plugin.ClearCacheFor(plugin.ID(f.entry), true)
}
return nil
}
Expand All @@ -270,14 +306,8 @@ func (f *file) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse
// Update known size. If the caller tries to increase the size of a Write-only entry, Flush
// will error because we won't be able to read to fill it in. We choose to error instead of
// filling with null data because there's no obvious use-case for supporting it.
if f.size != req.Size {
f.size = req.Size
}

// Shrink data if too large. Filling if too small is left for Write/Flush to deal with.
if uint64(len(f.data)) > f.size {
f.data = f.data[:f.size]
}
f.size = req.Size
f.sizeValid = true
}

f.fillAttr(&resp.Attr)
Expand Down
95 changes: 86 additions & 9 deletions fuse/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (suite *fileTestSuite) TestOpenAndRelease_WriteOnly() {

func (suite *fileTestSuite) TestOpenAndFlush() {
m := plugintest.NewMockReadWrite()
m.On("Read", suite.ctx).Return([]byte("hello"), nil).Once()
m.Attributes().SetSize(5)

f := newFile(nil, m)
req := fuse.OpenRequest{Flags: fuse.OpenReadWrite}
Expand All @@ -187,13 +187,11 @@ func (suite *fileTestSuite) TestOpenAndFlush() {
suite.Empty(f.writers)
suite.Nil(f.data)
}

m.AssertExpectations(suite.T())
}

func (suite *fileTestSuite) TestOpenAndFlushRelease() {
m := plugintest.NewMockReadWrite()
m.On("Read", suite.ctx).Return([]byte("hello"), nil).Once()
m.Attributes().SetSize(5)

f := newFile(nil, m)
req := fuse.OpenRequest{Flags: fuse.OpenReadWrite}
Expand All @@ -208,8 +206,6 @@ func (suite *fileTestSuite) TestOpenAndFlushRelease() {
suite.Empty(f.writers)
suite.Nil(f.data)
}

m.AssertExpectations(suite.T())
}

func (suite *fileTestSuite) TestAttrWithReaders() {
Expand Down Expand Up @@ -282,7 +278,8 @@ func (suite *fileTestSuite) TestRead() {

func (suite *fileTestSuite) TestWrite() {
m := plugintest.NewMockWrite()
m.On("Write", suite.ctx, []byte("hello")).Return(nil).Once()
// Called for Flush and ReleaseFlush.
m.On("Write", suite.ctx, []byte("hello")).Return(nil).Twice()

f := newFile(nil, m)
var resp fuse.OpenResponse
Expand All @@ -309,6 +306,7 @@ func (suite *fileTestSuite) TestWrite() {

func (suite *fileTestSuite) TestTruncateAndWrite() {
m := plugintest.NewMockReadWrite()
m.Attributes().SetSize(4)
m.On("Write", suite.ctx, []byte("hello")).Return(nil).Once()

f := newFile(nil, m)
Expand Down Expand Up @@ -338,6 +336,54 @@ func (suite *fileTestSuite) TestTruncateAndWrite() {
m.AssertExpectations(suite.T())
}

func (suite *fileTestSuite) TestGrowAndWrite() {
m := plugintest.NewMockReadWrite()
m.Attributes().SetSize(4)
m.On("Write", suite.ctx, append([]byte("hello"), make([]byte, 5)...)).Return(nil).Once()

f := newFile(nil, m)
var resp fuse.OpenResponse
handle, err := f.Open(suite.ctx, &fuse.OpenRequest{Flags: fuse.OpenWriteOnly}, &resp)
if !suite.NoError(err) || !suite.assertFileHandle(handle) {
suite.FailNow("Unusable handle")
}

setReq := fuse.SetattrRequest{Valid: fuse.SetattrHandle | fuse.SetattrSize, Handle: 1, Size: 10}
var setResp fuse.SetattrResponse
err = f.Setattr(suite.ctx, &setReq, &setResp)
suite.NoError(err)

writeReq := fuse.WriteRequest{Offset: 0, Data: []byte("hello"), Handle: 1}
var writeResp fuse.WriteResponse
err = handle.(fs.HandleWriter).Write(suite.ctx, &writeReq, &writeResp)
suite.NoError(err)
suite.Equal(5, writeResp.Size)

err = handle.(fs.HandleFlusher).Flush(suite.ctx, &fuse.FlushRequest{Handle: 1})
suite.NoError(err)

err = handle.(fs.HandleReleaser).Release(suite.ctx, &fuse.ReleaseRequest{Handle: 1})
suite.NoError(err)

m.AssertExpectations(suite.T())
}

func (suite *fileTestSuite) TestPartialWrite() {
m := plugintest.NewMockWrite()

f := newFile(nil, m)
var resp fuse.OpenResponse
handle, err := f.Open(suite.ctx, &fuse.OpenRequest{Flags: fuse.OpenWriteOnly}, &resp)
if !suite.NoError(err) || !suite.assertFileHandle(handle) {
suite.FailNow("Unusable handle")
}

writeReq := fuse.WriteRequest{Offset: 2, Data: []byte("11"), Handle: 1}
var writeResp fuse.WriteResponse
err = handle.(fs.HandleWriter).Write(suite.ctx, &writeReq, &writeResp)
suite.Error(err)
}

func (suite *fileTestSuite) TestReadWrite_Smaller() {
m := plugintest.NewMockBlockReadWrite()
m.Attributes().SetSize(5)
Expand All @@ -363,16 +409,47 @@ func (suite *fileTestSuite) TestReadWrite_Smaller() {
suite.NoError(err)
suite.Equal(2, writeResp.Size)

// Test this for correct size mid-write. After flush, the reported size will be
// whatever future calls to List return.
// Test this for correct size mid-write. After release, the reported size will be whatever
// future calls to List return.
var attr fuse.Attr
err = f.Attr(suite.ctx, &attr)
suite.NoError(err)
suite.Equal(uint64(4), attr.Size)

err = handle.(fs.HandleReleaser).Release(suite.ctx, &fuse.ReleaseRequest{ReleaseFlags: fuse.ReleaseFlush, Handle: 1})
suite.NoError(err)

m.AssertExpectations(suite.T())
}

func (suite *fileTestSuite) TestReadWrite_PartialWriteOnly() {
m := plugintest.NewMockReadWrite()
m.On("Read", suite.ctx).Return([]byte("hello"), nil).Once()
m.On("Write", suite.ctx, []byte("he11o")).Return(nil).Once()

f := newFile(nil, m)
var resp fuse.OpenResponse
handle, err := f.Open(suite.ctx, &fuse.OpenRequest{Flags: fuse.OpenWriteOnly}, &resp)
if !suite.NoError(err) || !suite.assertFileHandle(handle) {
suite.FailNow("Unusable handle")
}

writeReq := fuse.WriteRequest{Offset: 2, Data: []byte("11"), Handle: 1}
var writeResp fuse.WriteResponse
err = handle.(fs.HandleWriter).Write(suite.ctx, &writeReq, &writeResp)
suite.NoError(err)
suite.Equal(2, writeResp.Size)

err = handle.(fs.HandleFlusher).Flush(suite.ctx, &fuse.FlushRequest{Handle: 1})
suite.NoError(err)

// Test this for correct size mid-write. After release, the reported size will be whatever
// future calls to List return. Before flush, we haven't checked the actual size.
var attr fuse.Attr
err = f.Attr(suite.ctx, &attr)
suite.NoError(err)
suite.Equal(uint64(5), attr.Size)

err = handle.(fs.HandleReleaser).Release(suite.ctx, &fuse.ReleaseRequest{Handle: 1})
suite.NoError(err)

Expand Down
5 changes: 5 additions & 0 deletions plugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type Streamable interface {
}

// BlockReadable is an entry with data that can be read in blocks.
// A BlockReadable entry must set its Size attribute.
type BlockReadable interface {
Entry
Read(ctx context.Context, size int64, offset int64) ([]byte, error)
Expand All @@ -200,6 +201,10 @@ type Readable interface {
// configuration change to an API, or writing data to a queue. It doesn't
// support a concept of a partial write.
//
// Writable can be implemented with or without Readable/BlockReadable. If an
// entry is only Writable, then only full writes (starting from offset 0) are
// allowed, anything else initiated by the filesystem will result in an error.
//
// It's up to the implementer to decide how much data integrity to guarantee.
type Writable interface {
Entry
Expand Down

0 comments on commit 780630d

Please sign in to comment.