Skip to content

Commit

Permalink
apacheGH-43359: [Go][Parquet] ReadRowGroups panics with canceled cont…
Browse files Browse the repository at this point in the history
…ext (apache#43360)

### Rationale for this change

`ReadRowGroups` needs to support externally canceled contexts, e.g. for request-scoped contexts in servers like gRPC.

### What changes are included in this PR?

Additionnaly, `releaseColumns` needs to ignore columns with uninitialized data as it used in a `defer` statement.

### Are these changes tested?

Yes: a new test `TestArrowReaderCanceledContext` is included.

### Are there any user-facing changes?

None
* GitHub Issue: apache#43359

Authored-by: sebdotv <[email protected]>
Signed-off-by: Joel Lubinitsky <[email protected]>
  • Loading branch information
sebdotv authored Jul 23, 2024
1 parent a88f0cd commit de19af9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
5 changes: 5 additions & 0 deletions go/parquet/pqarrow/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pqarrow

import (
"context"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -375,6 +376,10 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in
data.data.Release()
}

// if the context is in error, but we haven't set an error yet, then it means that the parent context
// was cancelled. In this case, we should exit early as some columns may not have been read yet.
err = errors.Join(err, ctx.Err())

if err != nil {
// if we encountered an error, consume any waiting data on the channel
// so the goroutines don't leak and so memory can get cleaned up. we already
Expand Down
23 changes: 23 additions & 0 deletions go/parquet/pqarrow/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,29 @@ func TestArrowReaderAdHocReadFloat16s(t *testing.T) {
}
}

func TestArrowReaderCanceledContext(t *testing.T) {
dataDir := getDataDir()

mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

filename := filepath.Join(dataDir, "int32_decimal.parquet")
require.FileExists(t, filename)

rdr, err := file.OpenParquetFile(filename, false, file.WithReadProps(parquet.NewReaderProperties(mem)))
require.NoError(t, err)
defer rdr.Close()
arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
require.NoError(t, err)

// create a canceled context
ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err = arrowRdr.ReadTable(ctx)
require.ErrorIs(t, err, context.Canceled)
}

func TestRecordReaderParallel(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
Expand Down
4 changes: 3 additions & 1 deletion go/parquet/pqarrow/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func releaseArrayData(data []arrow.ArrayData) {

func releaseColumns(columns []arrow.Column) {
for _, col := range columns {
col.Release()
if col.Data() != nil { // data can be nil due to the way columns are constructed in ReadRowGroups
col.Release()
}
}
}

0 comments on commit de19af9

Please sign in to comment.