Skip to content

Commit

Permalink
apacheGH-38462: [Go][Parquet] Handle Boolean RLE encoding/decoding (a…
Browse files Browse the repository at this point in the history
…pache#38367)

### Rationale for this change
Looks like the parquet-testing repo files have been updated and now include boolean columns which use the RLE encoding type. This causes the Go parquet lib to fail verification tests when it pulls the most recent commits for the parquet-testing repository. So a solution for this is to actually implement the RleBoolean encoder and decoder.

### What changes are included in this PR?
Adding `RleBooleanEncoder` and `RleBooleanDecoder` and updating the `parquet-testing` repo.

### Are these changes tested?
Unit tests are added, and this is also tested via the `parquet-testing` golden files.

* Closes: apache#38345
* Closes: apache#38462

Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade and kou authored Oct 30, 2023
1 parent b42d44e commit 23b62a4
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 14 deletions.
8 changes: 7 additions & 1 deletion go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package file

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -345,14 +346,19 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
c.curDecoder = decoder
} else {
switch encoding {
case format.Encoding_RLE:
if c.descr.PhysicalType() != parquet.Types.Boolean {
return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
}
fallthrough
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
return xerrors.New("parquet: dictionary page must be before data page")
return errors.New("parquet: dictionary page must be before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
default:
Expand Down
61 changes: 61 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"crypto/rand"
"encoding/binary"
"io"
"os"
"path"
"testing"

"github.com/apache/arrow/go/v14/arrow/memory"
Expand Down Expand Up @@ -385,3 +387,62 @@ func TestDeltaLengthByteArrayPackingWithNulls(t *testing.T) {
assert.NotNil(t, readData[0])
}
}

func TestRleBooleanEncodingFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
assert.DirExists(t, dir)

props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(path.Join(dir, "rle_boolean_encoding.parquet"),
false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

assert.Equal(t, 1, fileReader.NumRowGroups())
rgr := fileReader.RowGroup(0)
assert.EqualValues(t, 68, rgr.NumRows())

rdr, err := rgr.Column(0)
require.NoError(t, err)
brdr := rdr.(*file.BooleanColumnChunkReader)

values := make([]bool, 68)
defLvls, repLvls := make([]int16, 68), make([]int16, 68)
total, read, err := brdr.ReadBatch(68, values, defLvls, repLvls)
require.NoError(t, err)

assert.EqualValues(t, 68, total)
md, err := rgr.MetaData().ColumnChunk(0)
require.NoError(t, err)
stats, err := md.Statistics()
require.NoError(t, err)
assert.EqualValues(t, total-stats.NullCount(), read)

expected := []bool{
true, false, true, true, false, false,
true, true, true, false, false, true, true,
false, true, true, false, false, true, true,
false, true, true, false, false, true, true,
true, false, false, false, false, true, true,
false, true, true, false, false, true, true,
true, false, false, true, true, false, false,
true, true, true, false, true, true, false,
true, true, false, false, true, true, true,
}
expectedNulls := []int{2, 15, 23, 38, 48, 60}

expectedNullIdx := 0
for i, v := range defLvls {
if expectedNullIdx < len(expectedNulls) && i == expectedNulls[expectedNullIdx] {
assert.Zero(t, v)
expectedNullIdx++
} else {
assert.EqualValues(t, 1, v)
}
}

assert.Equal(t, expected, values[:len(expected)])
}
82 changes: 80 additions & 2 deletions go/parquet/internal/encoding/boolean_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package encoding

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"

"github.com/apache/arrow/go/v14/arrow/bitutil"
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
"golang.org/x/xerrors"
)

// PlainBooleanDecoder is for the Plain Encoding type, there is no
Expand Down Expand Up @@ -103,7 +108,80 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBit
return 0, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: boolean decoder: number of values / definition levels read did not match")
return valuesRead, errors.New("parquet: boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
return dec.Decode(out)
}

type RleBooleanDecoder struct {
decoder

rleDec *utils.RleDecoder
}

func (RleBooleanDecoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
dec.nvals = nvals

if len(data) < 4 {
return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
}

// load the first 4 bytes in little-endian which indicates the length
nbytes := binary.LittleEndian.Uint32(data[:4])
if nbytes > uint32(len(data)-4) {
return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
}

dec.data = data[4:]
if dec.rleDec == nil {
dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
} else {
dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
}
return nil
}

func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
max := shared_utils.MinInt(len(out), dec.nvals)

var (
buf [1024]uint64
n = max
)

for n > 0 {
batch := shared_utils.MinInt(len(buf), n)
decoded := dec.rleDec.GetBatch(buf[:batch])
if decoded != batch {
return max - n, io.ErrUnexpectedEOF
}

for i := 0; i < batch; i++ {
out[i] = buf[i] != 0
}
n -= batch
out = out[batch:]
}

dec.nvals -= max
return max, nil
}

func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount > 0 {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return 0, err
}
if valuesRead != toRead {
return valuesRead, errors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
Expand Down
55 changes: 55 additions & 0 deletions go/parquet/internal/encoding/boolean_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package encoding

import (
"encoding/binary"

"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/debug"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
)

Expand Down Expand Up @@ -87,3 +90,55 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) {

return enc.sink.Finish(), nil
}

const rleLengthInBytes = 4

type RleBooleanEncoder struct {
encoder

bufferedValues []bool
}

func (RleBooleanEncoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (enc *RleBooleanEncoder) Put(in []bool) {
enc.bufferedValues = append(enc.bufferedValues, in...)
}

func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
bufferOut := make([]bool, len(in))
nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
enc.Put(bufferOut[:nvalid])
}

func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
return rleLengthInBytes + int64(enc.maxRleBufferSize())
}

func (enc *RleBooleanEncoder) maxRleBufferSize() int {
return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) +
utils.MinRLEBufferSize(1)
}

func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
rleBufferSizeMax := enc.maxRleBufferSize()
enc.sink.SetOffset(rleLengthInBytes)
enc.sink.Reserve(rleBufferSizeMax)

rleEncoder := utils.NewRleEncoder(enc.sink, 1)
for _, v := range enc.bufferedValues {
if v {
rleEncoder.Put(1)
} else {
rleEncoder.Put(0)
}
}
n := rleEncoder.Flush()
debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than expected max")
buf := enc.sink.Finish()
binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))

return buf, nil
}
2 changes: 1 addition & 1 deletion go/parquet/internal/encoding/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (d *dictEncoder) FlushValues() (Buffer, error) {
// EstimatedDataEncodedSize returns the maximum number of bytes needed to store the RLE encoded indexes, not including the
// dictionary index in the computation.
func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
return 1 + int64(utils.MaxBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
return 1 + int64(utils.MaxRLEBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinRLEBufferSize(d.BitWidth()))
}

// NumEntries returns the number of entires in the dictionary index for this encoder.
Expand Down
12 changes: 12 additions & 0 deletions go/parquet/internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}

func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
switch b.typ {
case reflect.TypeOf(true):
b.initData(2000, 200)
b.checkRoundTrip(parquet.Encodings.RLE)
default:
b.T().SkipNow()
}
}

func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)

Expand Down Expand Up @@ -408,6 +418,8 @@ func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
if validBits != nil {
b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset)
switch b.typ {
case reflect.TypeOf(false):
b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset)
case reflect.TypeOf(parquet.ByteArray{}):
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/internal/encoding/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffer
nbytes := 0
switch encoding {
case parquet.Encodings.RLE:
nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + utils.MinBufferSize(bitWidth)
nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) + utils.MinRLEBufferSize(bitWidth)
case parquet.Encodings.BitPacked:
nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
default:
Expand Down
4 changes: 4 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
{{- if eq .Name "Boolean" }}
case format.Encoding_RLE:
return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
Expand Down Expand Up @@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- if eq .Name "Boolean" }}
case parquet.Encodings.RLE:
return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
Expand Down
13 changes: 7 additions & 6 deletions go/parquet/internal/testutils/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,16 @@ func fillRandomIsValid(seed uint64, pctNull float64, out []bool) {
// If the type is parquet.ByteArray or parquet.FixedLenByteArray, heap must not be null.
//
// The default values are:
// []bool uses the current time as the seed with only values of 1 being false, for use
// of creating validity boolean slices.
// all other types use 0 as the seed
// a []parquet.ByteArray is populated with lengths between 2 and 12
// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12.
//
// []bool uses the current time as the seed with only values of 1 being false, for use
// of creating validity boolean slices.
// all other types use 0 as the seed
// a []parquet.ByteArray is populated with lengths between 2 and 12
// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12.
func InitValues(values interface{}, heap *memory.Buffer) {
switch arr := values.(type) {
case []bool:
fillRandomIsValid(uint64(time.Now().Unix()), 1.0, arr)
fillRandomIsValid(uint64(time.Now().Unix()), 0.5, arr)
case []int32:
FillRandomInt32(0, arr)
case []int64:
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/internal/utils/bit_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (r *RLERandomSuite) checkRoundTrip(vals []uint64, width int) bool {

func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) {
nvalues := vals.Len()
bufsize := utils.MaxBufferSize(width, nvalues)
bufsize := utils.MaxRLEBufferSize(width, nvalues)

buffer := make([]byte, bufsize)
encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width)
Expand Down
Loading

0 comments on commit 23b62a4

Please sign in to comment.