From 697a2cbaf834616a676f11f1b89b9200344de998 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 5 Sep 2023 10:39:43 -0400 Subject: [PATCH] GH-37419: [Go][Parquet] Decimal256 support for pqarrow (#37503) ### Rationale for this change When support for decimal128 was added to the Go Parquet lib, decimal256 wasn't yet implemented in the library. This adds proper support to read/write decimal256 with Parquet based on origin schemas and precision. ### Are these changes tested? yes * Closes: #37419 Authored-by: Matt Topol Signed-off-by: Matt Topol --- go/arrow/decimal128/decimal128.go | 5 + go/parquet/pqarrow/column_readers.go | 122 +++++++++++++++++++++--- go/parquet/pqarrow/encode_arrow.go | 63 ++++++++++++ go/parquet/pqarrow/encode_arrow_test.go | 65 +++++++++++-- go/parquet/pqarrow/schema.go | 43 +++++++-- go/parquet/writer_properties.go | 43 ++++++--- 6 files changed, 298 insertions(+), 43 deletions(-) diff --git a/go/arrow/decimal128/decimal128.go b/go/arrow/decimal128/decimal128.go index 68d729526d353..898d7b427ec84 100644 --- a/go/arrow/decimal128/decimal128.go +++ b/go/arrow/decimal128/decimal128.go @@ -26,6 +26,11 @@ import ( "github.com/apache/arrow/go/v14/arrow/internal/debug" ) +const ( + MaxPrecision = 38 + MaxScale = 38 +) + var ( MaxDecimal128 = New(542101086242752217, 687399551400673280-1) ) diff --git a/go/parquet/pqarrow/column_readers.go b/go/parquet/pqarrow/column_readers.go index 6b84e56295ac6..759a3d8675927 100644 --- a/go/parquet/pqarrow/column_readers.go +++ b/go/parquet/pqarrow/column_readers.go @@ -30,6 +30,7 @@ import ( "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/bitutil" "github.com/apache/arrow/go/v14/arrow/decimal128" + "github.com/apache/arrow/go/v14/arrow/decimal256" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/internal/utils" "github.com/apache/arrow/go/v14/parquet" @@ -493,14 +494,14 @@ func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr * data = transferDate64(rdr, valueType) case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, arrow.LARGE_BINARY, arrow.LARGE_STRING: return transferBinary(rdr, valueType), nil - case arrow.DECIMAL: + case arrow.DECIMAL, arrow.DECIMAL256: switch descr.PhysicalType() { case parquet.Types.Int32, parquet.Types.Int64: data = transferDecimalInteger(rdr, valueType) case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray: return transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType) default: - return nil, errors.New("physical type for decimal128 must be int32, int64, bytearray or fixed len byte array") + return nil, errors.New("physical type for decimal128/decimal256 must be int32, int64, bytearray or fixed len byte array") } case arrow.TIMESTAMP: tstype := valueType.(*arrow.TimestampType) @@ -722,10 +723,20 @@ func transferDecimalInteger(rdr file.RecordReader, dt arrow.DataType) arrow.Arra values = reflect.ValueOf(arrow.Int64Traits.CastFromBytes(rdr.Values())[:length]) } - data := make([]byte, arrow.Decimal128Traits.BytesRequired(length)) - out := arrow.Decimal128Traits.CastFromBytes(data) - for i := 0; i < values.Len(); i++ { - out[i] = decimal128.FromI64(values.Index(i).Int()) + var data []byte + switch dt.ID() { + case arrow.DECIMAL128: + data = make([]byte, arrow.Decimal128Traits.BytesRequired(length)) + out := arrow.Decimal128Traits.CastFromBytes(data) + for i := 0; i < values.Len(); i++ { + out[i] = decimal128.FromI64(values.Index(i).Int()) + } + case arrow.DECIMAL256: + data = make([]byte, arrow.Decimal256Traits.BytesRequired(length)) + out := arrow.Decimal256Traits.CastFromBytes(data) + for i := 0; i < values.Len(); i++ { + out[i] = decimal256.FromI64(values.Index(i).Int()) + } } var nullmap *memory.Buffer @@ -801,6 +812,52 @@ func bigEndianToDecimal128(buf []byte) (decimal128.Num, error) { return decimal128.New(hi, uint64(lo)), nil } +func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) { + const ( + minDecimalBytes = 1 + maxDecimalBytes = 32 + ) + + if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes { + return decimal256.Num{}, + fmt.Errorf("%w: length of byte array for bigEndianToDecimal256 was %d but must be between %d and %d", + arrow.ErrInvalid, len(buf), minDecimalBytes, maxDecimalBytes) + } + + var littleEndian [4]uint64 + // bytes are coming in big-endian, so the first byte is the MSB and + // therefore holds the sign bit + initWord, isNeg := uint64(0), int8(buf[0]) < 0 + if isNeg { + // sign extend if necessary + initWord = uint64(0xFFFFFFFFFFFFFFFF) + } + + for wordIdx := 0; wordIdx < 4; wordIdx++ { + wordLen := utils.MinInt(len(buf), arrow.Uint64SizeBytes) + word := buf[len(buf)-wordLen:] + + if wordLen == 8 { + // full words can be assigned as-is + littleEndian[wordIdx] = binary.BigEndian.Uint64(word) + } else { + result := initWord + if len(buf) > 0 { + // incorporate the actual values if present + // shift left enough bits to make room for the incoming int64 + result = result << uint64(wordLen) + // preserve the upper bits by inplace OR-ing the int64 + result |= uint64FromBigEndianShifted(word) + } + littleEndian[wordIdx] = result + } + + buf = buf[:len(buf)-wordLen] + } + + return decimal256.New(littleEndian[3], littleEndian[2], littleEndian[1], littleEndian[0]), nil +} + type varOrFixedBin interface { arrow.Array Value(i int) []byte @@ -808,21 +865,19 @@ type varOrFixedBin interface { // convert physical byte storage, instead of integers, to decimal128 func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arrow.Chunked, error) { - convert := func(arr arrow.Array) (arrow.Array, error) { - length := arr.Len() + convert128 := func(in varOrFixedBin) (arrow.Array, error) { + length := in.Len() data := make([]byte, arrow.Decimal128Traits.BytesRequired(length)) out := arrow.Decimal128Traits.CastFromBytes(data) - input := arr.(varOrFixedBin) - nullCount := input.NullN() - + nullCount := in.NullN() var err error for i := 0; i < length; i++ { - if nullCount > 0 && input.IsNull(i) { + if nullCount > 0 && in.IsNull(i) { continue } - rec := input.Value(i) + rec := in.Value(i) if len(rec) <= 0 { return nil, fmt.Errorf("invalud BYTEARRAY length for type: %s", dt) } @@ -833,12 +888,51 @@ func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arro } ret := array.NewData(dt, length, []*memory.Buffer{ - input.Data().Buffers()[0], memory.NewBufferBytes(data), + in.Data().Buffers()[0], memory.NewBufferBytes(data), }, nil, nullCount, 0) defer ret.Release() return array.MakeFromData(ret), nil } + convert256 := func(in varOrFixedBin) (arrow.Array, error) { + length := in.Len() + data := make([]byte, arrow.Decimal256Traits.BytesRequired(length)) + out := arrow.Decimal256Traits.CastFromBytes(data) + + nullCount := in.NullN() + var err error + for i := 0; i < length; i++ { + if nullCount > 0 && in.IsNull(i) { + continue + } + + rec := in.Value(i) + if len(rec) <= 0 { + return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt) + } + out[i], err = bigEndianToDecimal256(rec) + if err != nil { + return nil, err + } + } + + ret := array.NewData(dt, length, []*memory.Buffer{ + in.Data().Buffers()[0], memory.NewBufferBytes(data), + }, nil, nullCount, 0) + defer ret.Release() + return array.MakeFromData(ret), nil + } + + convert := func(arr arrow.Array) (arrow.Array, error) { + switch dt.ID() { + case arrow.DECIMAL128: + return convert128(arr.(varOrFixedBin)) + case arrow.DECIMAL256: + return convert256(arr.(varOrFixedBin)) + } + return nil, arrow.ErrNotImplemented + } + chunks := rdr.GetBuilderChunks() var err error for idx, chunk := range chunks { diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 6784014281b8b..c3a0a50c43f45 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "errors" "fmt" + "math" "time" "unsafe" @@ -28,10 +29,12 @@ import ( "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/bitutil" "github.com/apache/arrow/go/v14/arrow/decimal128" + "github.com/apache/arrow/go/v14/arrow/decimal256" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/internal/utils" "github.com/apache/arrow/go/v14/parquet" "github.com/apache/arrow/go/v14/parquet/file" + "github.com/apache/arrow/go/v14/parquet/internal/debug" ) // get the count of the number of leaf arrays for the type @@ -327,6 +330,18 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr for idx, val := range leafArr.(*array.Date64).Date64Values() { data[idx] = int32(val / 86400000) // coerce date64 values } + case arrow.DECIMAL128: + for idx, val := range leafArr.(*array.Decimal128).Values() { + debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1") + debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32") + data[idx] = int32(val.LowBits()) + } + case arrow.DECIMAL256: + for idx, val := range leafArr.(*array.Decimal256).Values() { + debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1") + debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32") + data[idx] = int32(val.LowBits()) + } default: return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name()) } @@ -396,6 +411,20 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes()) data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()] } + case arrow.DECIMAL128: + ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) + data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) + for idx, val := range leafArr.(*array.Decimal128).Values() { + debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1") + data[idx] = int64(val.LowBits()) + } + case arrow.DECIMAL256: + ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len())) + data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes()) + for idx, val := range leafArr.(*array.Decimal256).Values() { + debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1") + data[idx] = int64(val.LowBits()) + } default: return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name()) } @@ -519,6 +548,40 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr } wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) } + case *arrow.Decimal256Type: + // parquet decimal are stored with FixedLength values where the length is + // proportional to the precision. Arrow's Decimal are always stored with 16/32 + // bytes. thus the internal FLBA must be adjusted by the offset calculation + offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision)) + ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth()) + scratch := ctx.dataBuffer.Bytes() + typeLen := wr.Descr().TypeLength() + fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray { + out := scratch[offset : offset+typeLen] + vals := in.Array() + binary.BigEndian.PutUint64(scratch, vals[3]) + binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2]) + binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1]) + binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0]) + scratch = scratch[4*arrow.Uint64SizeBytes:] + return out + } + + data := make([]parquet.FixedLenByteArray, leafArr.Len()) + arr := leafArr.(*array.Decimal256) + if leafArr.NullN() == 0 { + for idx := range data { + data[idx] = fixDecimalEndianness(arr.Value(idx)) + } + _, err = wr.WriteBatch(data, defLevels, repLevels) + } else { + for idx := range data { + if arr.IsValid(idx) { + data[idx] = fixDecimalEndianness(arr.Value(idx)) + } + } + wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) + } default: return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name()) } diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 63fd4f142abba..654d3d813cf85 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -29,6 +29,7 @@ import ( "github.com/apache/arrow/go/v14/arrow/array" "github.com/apache/arrow/go/v14/arrow/bitutil" "github.com/apache/arrow/go/v14/arrow/decimal128" + "github.com/apache/arrow/go/v14/arrow/decimal256" "github.com/apache/arrow/go/v14/arrow/ipc" "github.com/apache/arrow/go/v14/arrow/memory" "github.com/apache/arrow/go/v14/internal/types" @@ -474,9 +475,9 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType { default: panic("only micro and nano seconds are supported for arrow TIME64") } - case arrow.DECIMAL: - dec := typ.(*arrow.Decimal128Type) - return schema.NewDecimalLogicalType(dec.Precision, dec.Scale) + case arrow.DECIMAL, arrow.DECIMAL256: + dec := typ.(arrow.DecimalType) + return schema.NewDecimalLogicalType(dec.GetPrecision(), dec.GetScale()) } return schema.NoLogicalType{} } @@ -552,15 +553,15 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R switch typ := typ.(type) { case *arrow.FixedSizeBinaryType: byteWidth = int32(typ.ByteWidth) - case *arrow.Decimal128Type: - byteWidth = pqarrow.DecimalSize(typ.Precision) + case arrow.DecimalType: + byteWidth = pqarrow.DecimalSize(typ.GetPrecision()) case *arrow.DictionaryType: valuesType := typ.ValueType switch dt := valuesType.(type) { case *arrow.FixedSizeBinaryType: byteWidth = int32(dt.ByteWidth) - case *arrow.Decimal128Type: - byteWidth = pqarrow.DecimalSize(dt.Precision) + case arrow.DecimalType: + byteWidth = pqarrow.DecimalSize(dt.GetPrecision()) } } @@ -932,6 +933,56 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() { ps.True(array.Equal(expected, chunked.Chunk(0))) } +func (ps *ParquetIOTestSuite) TestReadDecimal256() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(ps.T(), 0) + + bigEndian := []parquet.ByteArray{ + // 123456 + []byte{1, 226, 64}, + // 987654 + []byte{15, 18, 6}, + // -123456 + []byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, + } + + bldr := array.NewDecimal256Builder(mem, &arrow.Decimal256Type{Precision: 40, Scale: 3}) + defer bldr.Release() + + bldr.Append(decimal256.FromU64(123456)) + bldr.Append(decimal256.FromU64(987654)) + bldr.Append(decimal256.FromI64(-123456)) + + expected := bldr.NewDecimal256Array() + defer expected.Release() + + sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ + schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required, schema.NewDecimalLogicalType(40, 3), parquet.Types.ByteArray, -1, -1)), + }, -1)) + + sink := encoding.NewBufferWriter(0, mem) + defer sink.Release() + writer := file.NewParquetWriter(sink, sc) + + rgw := writer.AppendRowGroup() + cw, _ := rgw.NextColumn() + cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil) + cw.Close() + rgw.Close() + writer.Close() + + rdr := ps.createReader(mem, sink.Bytes()) + cr, err := rdr.GetColumn(context.TODO(), 0) + ps.NoError(err) + + chunked, err := cr.NextBatch(smallSize) + ps.NoError(err) + defer chunked.Release() + + ps.Len(chunked.Chunks(), 1) + ps.Truef(array.Equal(expected, chunked.Chunk(0)), "expected: %s\ngot: %s", expected, chunked.Chunk(0)) +} + func (ps *ParquetIOTestSuite) TestReadNestedStruct() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index 4c1627263681a..9ba8554898986 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -23,6 +23,7 @@ import ( "strconv" "github.com/apache/arrow/go/v14/arrow" + "github.com/apache/arrow/go/v14/arrow/decimal128" "github.com/apache/arrow/go/v14/arrow/flight" "github.com/apache/arrow/go/v14/arrow/ipc" "github.com/apache/arrow/go/v14/arrow/memory" @@ -304,12 +305,22 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties case arrow.FIXED_SIZE_BINARY: typ = parquet.Types.FixedLenByteArray length = field.Type.(*arrow.FixedSizeBinaryType).ByteWidth - case arrow.DECIMAL: - typ = parquet.Types.FixedLenByteArray - dectype := field.Type.(*arrow.Decimal128Type) - precision = int(dectype.Precision) - scale = int(dectype.Scale) - length = int(DecimalSize(int32(precision))) + case arrow.DECIMAL, arrow.DECIMAL256: + dectype := field.Type.(arrow.DecimalType) + precision = int(dectype.GetPrecision()) + scale = int(dectype.GetScale()) + + if props.StoreDecimalAsInteger() && 1 <= precision && precision <= 18 { + if precision <= 9 { + typ = parquet.Types.Int32 + } else { + typ = parquet.Types.Int64 + } + } else { + typ = parquet.Types.FixedLenByteArray + length = int(DecimalSize(int32(precision))) + } + logicalType = schema.NewDecimalLogicalType(int32(precision), int32(scale)) case arrow.DATE32: typ = parquet.Types.Int32 @@ -521,6 +532,13 @@ func arrowTimestamp(logical *schema.TimestampLogicalType) (arrow.DataType, error } } +func arrowDecimal(logical *schema.DecimalLogicalType) arrow.DataType { + if logical.Precision() <= decimal128.MaxPrecision { + return &arrow.Decimal128Type{Precision: logical.Precision(), Scale: logical.Scale()} + } + return &arrow.Decimal256Type{Precision: logical.Precision(), Scale: logical.Scale()} +} + func arrowFromInt32(logical schema.LogicalType) (arrow.DataType, error) { switch logtype := logical.(type) { case schema.NoLogicalType: @@ -528,7 +546,7 @@ func arrowFromInt32(logical schema.LogicalType) (arrow.DataType, error) { case *schema.TimeLogicalType: return arrowTime32(logtype) case *schema.DecimalLogicalType: - return &arrow.Decimal128Type{Precision: logtype.Precision(), Scale: logtype.Scale()}, nil + return arrowDecimal(logtype), nil case *schema.IntLogicalType: return arrowInt(logtype) case schema.DateLogicalType: @@ -547,7 +565,7 @@ func arrowFromInt64(logical schema.LogicalType) (arrow.DataType, error) { case *schema.IntLogicalType: return arrowInt(logtype) case *schema.DecimalLogicalType: - return &arrow.Decimal128Type{Precision: logtype.Precision(), Scale: logtype.Scale()}, nil + return arrowDecimal(logtype), nil case *schema.TimeLogicalType: return arrowTime64(logtype) case *schema.TimestampLogicalType: @@ -562,7 +580,7 @@ func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { case schema.StringLogicalType: return arrow.BinaryTypes.String, nil case *schema.DecimalLogicalType: - return &arrow.Decimal128Type{Precision: logtype.Precision(), Scale: logtype.Scale()}, nil + return arrowDecimal(logtype), nil case schema.NoLogicalType, schema.EnumLogicalType, schema.JSONLogicalType, @@ -576,7 +594,7 @@ func arrowFromByteArray(logical schema.LogicalType) (arrow.DataType, error) { func arrowFromFLBA(logical schema.LogicalType, length int) (arrow.DataType, error) { switch logtype := logical.(type) { case *schema.DecimalLogicalType: - return &arrow.Decimal128Type{Precision: logtype.Precision(), Scale: logtype.Scale()}, nil + return arrowDecimal(logtype), nil case schema.NoLogicalType, schema.IntervalLogicalType, schema.UUIDLogicalType: return &arrow.FixedSizeBinaryType{ByteWidth: int(length)}, nil default: @@ -1048,6 +1066,11 @@ func applyOriginalStorageMetadata(origin arrow.Field, inferred *SchemaField) (mo inferred.Field.Type = &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: inferred.Field.Type, Ordered: dictOriginType.Ordered} modified = true + case arrow.DECIMAL256: + if inferred.Field.Type.ID() == arrow.DECIMAL128 { + inferred.Field.Type = origin.Type + modified = true + } } if origin.HasMetadata() { diff --git a/go/parquet/writer_properties.go b/go/parquet/writer_properties.go index bd172f7f92702..e4199f451f5f9 100644 --- a/go/parquet/writer_properties.go +++ b/go/parquet/writer_properties.go @@ -63,6 +63,7 @@ type ColumnProperties struct { // DefaultColumnProperties returns the default properties which get utilized for writing. // // The default column properties are the following constants: +// // Encoding: Encodings.Plain // Codec: compress.Codecs.Uncompressed // DictionaryEnabled: DefaultDictionaryEnabled @@ -290,19 +291,28 @@ func WithEncryptionProperties(props *FileEncryptionProperties) WriterProperty { } } +// WithStoreDecimalAsInteger specifies whether to try using an int32/int64 for storing +// decimal data rather than fixed len byte arrays if the precision is low enough. +func WithStoreDecimalAsInteger(enabled bool) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.wr.storeDecimalAsInt = enabled + } +} + // WriterProperties is the collection of properties to use for writing a parquet file. The values are // read only once it has been constructed. type WriterProperties struct { - mem memory.Allocator - dictPagesize int64 - batchSize int64 - maxRowGroupLen int64 - pageSize int64 - parquetVersion Version - createdBy string - dataPageVersion DataPageVersion - rootName string - rootRepetition Repetition + mem memory.Allocator + dictPagesize int64 + batchSize int64 + maxRowGroupLen int64 + pageSize int64 + parquetVersion Version + createdBy string + dataPageVersion DataPageVersion + rootName string + rootRepetition Repetition + storeDecimalAsInt bool defColumnProps ColumnProperties columnProps map[string]*ColumnProperties @@ -330,8 +340,9 @@ func defaultWriterProperties() *WriterProperties { // properties will be utilized for writing. // // The Default properties use the following constants: +// // Allocator: memory.DefaultAllocator -// DictionaryPageSize: DefaultDictionaryPageSizeLimit +// DictionaryPageSize: DefaultDictionaryPageSizeLimit // BatchSize: DefaultWriteBatchSize // MaxRowGroupLength: DefaultMaxRowGroupLen // PageSize: DefaultDataPageSize @@ -413,7 +424,7 @@ func (w *WriterProperties) CompressionFor(path string) compress.Compression { return w.defColumnProps.Codec } -//CompressionPath is the same as CompressionFor but takes a ColumnPath +// CompressionPath is the same as CompressionFor but takes a ColumnPath func (w *WriterProperties) CompressionPath(path ColumnPath) compress.Compression { return w.CompressionFor(path.String()) } @@ -531,3 +542,11 @@ func (w *WriterProperties) ColumnEncryptionProperties(path string) *ColumnEncryp } return nil } + +// StoreDecimalAsInteger returns the config option controlling whether or not +// to try storing decimal data as an integer type if the precision is low enough +// (1 <= prec <= 18 can be stored as an int), otherwise it will be stored as +// a fixed len byte array. +func (w *WriterProperties) StoreDecimalAsInteger() bool { + return w.storeDecimalAsInt +}