diff --git a/go/arrow/avro/avro2parquet/main.go b/go/arrow/avro/avro2parquet/main.go new file mode 100644 index 0000000000000..45377b46a444c --- /dev/null +++ b/go/arrow/avro/avro2parquet/main.go @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bufio" + "bytes" + "flag" + "fmt" + "log" + "os" + "runtime/pprof" + "time" + + "github.com/apache/arrow/go/v15/arrow/avro" + "github.com/apache/arrow/go/v15/parquet" + "github.com/apache/arrow/go/v15/parquet/compress" + pq "github.com/apache/arrow/go/v15/parquet/pqarrow" +) + +var ( + cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`") + filepath = flag.String("file", "", "avro ocf to convert") +) + +func main() { + flag.Parse() + if *cpuprofile != "" { + f, err := os.Create(*cpuprofile) + if err != nil { + log.Fatal("could not create CPU profile: ", err) + } + defer f.Close() // error handling omitted for example + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatal("could not start CPU profile: ", err) + } + defer pprof.StopCPUProfile() + } + if *filepath == "" { + fmt.Println("no file specified") + } + chunk := 1024 * 8 + ts := time.Now() + log.Println("starting:") + info, err := os.Stat(*filepath) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + filesize := info.Size() + data, err := os.ReadFile(*filepath) + if err != nil { + fmt.Println(err) + os.Exit(2) + } + fmt.Printf("file : %v\nsize: %v MB\n", filepath, float64(filesize)/1024/1024) + + r := bytes.NewReader(data) + ior := bufio.NewReaderSize(r, 4096*8) + av2arReader, err := avro.NewOCFReader(ior, avro.WithChunk(chunk)) + if err != nil { + fmt.Println(err) + os.Exit(3) + } + fp, err := os.OpenFile(*filepath+".parquet", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + fmt.Println(err) + os.Exit(4) + } + defer fp.Close() + pwProperties := parquet.NewWriterProperties(parquet.WithDictionaryDefault(true), + parquet.WithVersion(parquet.V2_LATEST), + parquet.WithCompression(compress.Codecs.Snappy), + parquet.WithBatchSize(1024*32), + parquet.WithDataPageSize(1024*1024), + parquet.WithMaxRowGroupLength(64*1024*1024), + ) + awProperties := pq.NewArrowWriterProperties(pq.WithStoreSchema()) + pr, err := pq.NewFileWriter(av2arReader.Schema(), fp, pwProperties, awProperties) + if err != nil { + fmt.Println(err) + os.Exit(5) + } + defer pr.Close() + fmt.Printf("parquet version: %v\n", pwProperties.Version()) + for av2arReader.Next() { + if av2arReader.Err() != nil { + fmt.Println(err) + os.Exit(6) + } + recs := av2arReader.Record() + err = pr.WriteBuffered(recs) + if err != nil { + fmt.Println(err) + os.Exit(7) + } + recs.Release() + } + if av2arReader.Err() != nil { + fmt.Println(av2arReader.Err()) + } + + pr.Close() + log.Printf("time to convert: %v\n", time.Since(ts)) +} diff --git a/go/arrow/avro/loader.go b/go/arrow/avro/loader.go new file mode 100644 index 0000000000000..26d8678e8e2be --- /dev/null +++ b/go/arrow/avro/loader.go @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "errors" + "fmt" + "io" +) + +func (r *OCFReader) decodeOCFToChan() { + defer close(r.avroChan) + for r.r.HasNext() { + select { + case <-r.readerCtx.Done(): + r.err = fmt.Errorf("avro decoding cancelled, %d records read", r.avroDatumCount) + return + default: + var datum any + err := r.r.Decode(&datum) + if err != nil { + if errors.Is(err, io.EOF) { + r.err = nil + return + } + r.err = err + return + } + r.avroChan <- datum + r.avroDatumCount++ + } + } +} + +func (r *OCFReader) recordFactory() { + defer close(r.recChan) + r.primed = true + recChunk := 0 + switch { + case r.chunk < 1: + for data := range r.avroChan { + err := r.ldr.loadDatum(data) + if err != nil { + r.err = err + return + } + } + r.recChan <- r.bld.NewRecord() + r.bldDone <- struct{}{} + case r.chunk >= 1: + for data := range r.avroChan { + if recChunk == 0 { + r.bld.Reserve(r.chunk) + } + err := r.ldr.loadDatum(data) + if err != nil { + r.err = err + return + } + recChunk++ + if recChunk >= r.chunk { + r.recChan <- r.bld.NewRecord() + recChunk = 0 + } + } + if recChunk != 0 { + r.recChan <- r.bld.NewRecord() + } + r.bldDone <- struct{}{} + } +} diff --git a/go/arrow/avro/reader.go b/go/arrow/avro/reader.go new file mode 100644 index 0000000000000..e72a5632bdd6e --- /dev/null +++ b/go/arrow/avro/reader.go @@ -0,0 +1,337 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "context" + "errors" + "fmt" + "io" + "sync/atomic" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/internal/debug" + "github.com/apache/arrow/go/v15/arrow/memory" + "github.com/hamba/avro/v2/ocf" + "github.com/tidwall/sjson" + + avro "github.com/hamba/avro/v2" +) + +var ErrMismatchFields = errors.New("arrow/avro: number of records mismatch") + +// Option configures an Avro reader/writer. +type ( + Option func(config) + config *OCFReader +) + +type schemaEdit struct { + method string + path string + value any +} + +// Reader wraps goavro/OCFReader and creates array.Records from a schema. +type OCFReader struct { + r *ocf.Decoder + avroSchema string + avroSchemaEdits []schemaEdit + schema *arrow.Schema + + refs int64 + bld *array.RecordBuilder + bldMap *fieldPos + ldr *dataLoader + cur arrow.Record + err error + + primed bool + readerCtx context.Context + readCancel func() + maxOCF int + maxRec int + + avroChan chan any + avroDatumCount int64 + avroChanSize int + recChan chan arrow.Record + + bldDone chan struct{} + + recChanSize int + chunk int + mem memory.Allocator +} + +// NewReader returns a reader that reads from an Avro OCF file and creates +// arrow.Records from the converted avro data. +func NewOCFReader(r io.Reader, opts ...Option) (*OCFReader, error) { + ocfr, err := ocf.NewDecoder(r) + if err != nil { + return nil, fmt.Errorf("%w: could not create avro ocfreader", arrow.ErrInvalid) + } + + rr := &OCFReader{ + r: ocfr, + refs: 1, + chunk: 1, + avroChanSize: 500, + recChanSize: 10, + } + for _, opt := range opts { + opt(rr) + } + + rr.avroChan = make(chan any, rr.avroChanSize) + rr.recChan = make(chan arrow.Record, rr.recChanSize) + rr.bldDone = make(chan struct{}) + schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"])) + if err != nil { + return nil, fmt.Errorf("%w: could not parse avro header", arrow.ErrInvalid) + } + rr.avroSchema = schema.String() + if len(rr.avroSchemaEdits) > 0 { + // execute schema edits + for _, e := range rr.avroSchemaEdits { + err := rr.editAvroSchema(e) + if err != nil { + return nil, fmt.Errorf("%w: could not edit avro schema", arrow.ErrInvalid) + } + } + // validate edited schema + schema, err = avro.Parse(rr.avroSchema) + if err != nil { + return nil, fmt.Errorf("%w: could not parse modified avro schema", arrow.ErrInvalid) + } + } + rr.schema, err = ArrowSchemaFromAvro(schema) + if err != nil { + return nil, fmt.Errorf("%w: could not convert avro schema", arrow.ErrInvalid) + } + if rr.mem == nil { + rr.mem = memory.DefaultAllocator + } + rr.readerCtx, rr.readCancel = context.WithCancel(context.Background()) + go rr.decodeOCFToChan() + + rr.bld = array.NewRecordBuilder(rr.mem, rr.schema) + rr.bldMap = newFieldPos() + rr.ldr = newDataLoader() + for idx, fb := range rr.bld.Fields() { + mapFieldBuilders(fb, rr.schema.Field(idx), rr.bldMap) + } + rr.ldr.drawTree(rr.bldMap) + go rr.recordFactory() + return rr, nil +} + +// Reuse allows the OCFReader to be reused to read another Avro file provided the +// new Avro file has an identical schema. +func (rr *OCFReader) Reuse(r io.Reader, opts ...Option) error { + rr.Close() + rr.err = nil + ocfr, err := ocf.NewDecoder(r) + if err != nil { + return fmt.Errorf("%w: could not create avro ocfreader", arrow.ErrInvalid) + } + schema, err := avro.Parse(string(ocfr.Metadata()["avro.schema"])) + if err != nil { + return fmt.Errorf("%w: could not parse avro header", arrow.ErrInvalid) + } + if rr.avroSchema != schema.String() { + return fmt.Errorf("%w: avro schema mismatch", arrow.ErrInvalid) + } + + rr.r = ocfr + for _, opt := range opts { + opt(rr) + } + + rr.maxOCF = 0 + rr.maxRec = 0 + rr.avroDatumCount = 0 + rr.primed = false + + rr.avroChan = make(chan any, rr.avroChanSize) + rr.recChan = make(chan arrow.Record, rr.recChanSize) + rr.bldDone = make(chan struct{}) + + rr.readerCtx, rr.readCancel = context.WithCancel(context.Background()) + go rr.decodeOCFToChan() + go rr.recordFactory() + return nil +} + +// Err returns the last error encountered during the iteration over the +// underlying Avro file. +func (r *OCFReader) Err() error { return r.err } + +// AvroSchema returns the Avro schema of the Avro OCF +func (r *OCFReader) AvroSchema() string { return r.avroSchema } + +// Schema returns the converted Arrow schema of the Avro OCF +func (r *OCFReader) Schema() *arrow.Schema { return r.schema } + +// Record returns the current record that has been extracted from the +// underlying Avro OCF file. +// It is valid until the next call to Next. +func (r *OCFReader) Record() arrow.Record { return r.cur } + +// Metrics returns the maximum queue depth of the Avro record read cache and of the +// converted Arrow record cache. +func (r *OCFReader) Metrics() string { + return fmt.Sprintf("Max. OCF queue depth: %d/%d Max. record queue depth: %d/%d", r.maxOCF, r.avroChanSize, r.maxRec, r.recChanSize) +} + +// OCFRecordsReadCount returns the number of Avro datum that were read from the Avro file. +func (r *OCFReader) OCFRecordsReadCount() int64 { return r.avroDatumCount } + +// Close closes the OCFReader's Avro record read cache and converted Arrow record cache. OCFReader must +// be closed if the Avro OCF's records have not been read to completion. +func (r *OCFReader) Close() { + r.readCancel() + r.err = r.readerCtx.Err() +} + +func (r *OCFReader) editAvroSchema(e schemaEdit) error { + var err error + switch e.method { + case "set": + r.avroSchema, err = sjson.Set(r.avroSchema, e.path, e.value) + if err != nil { + return fmt.Errorf("%w: schema edit 'set %s = %v' failure - %v", arrow.ErrInvalid, e.path, e.value, err) + } + case "delete": + r.avroSchema, err = sjson.Delete(r.avroSchema, e.path) + if err != nil { + return fmt.Errorf("%w: schema edit 'delete' failure - %v", arrow.ErrInvalid, err) + } + default: + return fmt.Errorf("%w: schema edit method must be 'set' or 'delete'", arrow.ErrInvalid) + } + return nil +} + +// Next returns whether a Record can be received from the converted record queue. +// The user should check Err() after call to Next that return false to check +// if an error took place. +func (r *OCFReader) Next() bool { + if r.cur != nil { + r.cur.Release() + r.cur = nil + } + if r.maxOCF < len(r.avroChan) { + r.maxOCF = len(r.avroChan) + } + if r.maxRec < len(r.recChan) { + r.maxRec = len(r.recChan) + } + select { + case r.cur = <-r.recChan: + case <-r.bldDone: + if len(r.recChan) > 0 { + r.cur = <-r.recChan + } + } + if r.err != nil { + return false + } + + return r.cur != nil +} + +// WithAllocator specifies the Arrow memory allocator used while building records. +func WithAllocator(mem memory.Allocator) Option { + return func(cfg config) { + cfg.mem = mem + } +} + +// WithReadCacheSize specifies the size of the OCF record decode queue, default value +// is 500. +func WithReadCacheSize(n int) Option { + return func(cfg config) { + if n < 1 { + cfg.avroChanSize = 500 + } else { + cfg.avroChanSize = n + } + } +} + +// WithRecordCacheSize specifies the size of the converted Arrow record queue, default +// value is 1. +func WithRecordCacheSize(n int) Option { + return func(cfg config) { + if n < 1 { + cfg.recChanSize = 1 + } else { + cfg.recChanSize = n + } + } +} + +// WithSchemaEdit specifies modifications to the Avro schema. Supported methods are 'set' and +// 'delete'. Set sets the value for the specified path. Delete deletes the value for the specified path. +// A path is in dot syntax, such as "fields.1" or "fields.0.type". The modified Avro schema is +// validated before conversion to Arrow schema - NewOCFReader will return an error if the modified schema +// cannot be parsed. +func WithSchemaEdit(method, path string, value any) Option { + return func(cfg config) { + var e schemaEdit + e.method = method + e.path = path + e.value = value + cfg.avroSchemaEdits = append(cfg.avroSchemaEdits, e) + } +} + +// WithChunk specifies the chunk size used while reading Avro OCF files. +// +// If n is zero or 1, no chunking will take place and the reader will create +// one record per row. +// If n is greater than 1, chunks of n rows will be read. +// If n is negative, the reader will load the whole Avro OCF file into memory and +// create one big record with all the rows. +func WithChunk(n int) Option { + return func(cfg config) { + cfg.chunk = n + } +} + +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (r *OCFReader) Retain() { + atomic.AddInt64(&r.refs, 1) +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (r *OCFReader) Release() { + debug.Assert(atomic.LoadInt64(&r.refs) > 0, "too many releases") + + if atomic.AddInt64(&r.refs, -1) == 0 { + if r.cur != nil { + r.cur.Release() + } + } +} + +var _ array.RecordReader = (*OCFReader)(nil) diff --git a/go/arrow/avro/reader_test.go b/go/arrow/avro/reader_test.go new file mode 100644 index 0000000000000..e94d4f48fb933 --- /dev/null +++ b/go/arrow/avro/reader_test.go @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "fmt" + "testing" + + "github.com/apache/arrow/go/v15/arrow" + hamba "github.com/hamba/avro/v2" +) + +func TestEditSchemaStringEqual(t *testing.T) { + tests := []struct { + avroSchema string + arrowSchema []arrow.Field + }{ + { + avroSchema: `{ + "fields": [ + { + "name": "inheritNull", + "type": { + "name": "Simple", + "symbols": [ + "a", + "b" + ], + "type": "enum" + } + }, + { + "name": "explicitNamespace", + "type": { + "name": "test", + "namespace": "org.hamba.avro", + "size": 12, + "type": "fixed" + } + }, + { + "name": "fullName", + "type": { + "type": "record", + "name": "fullName_data", + "namespace": "ignored", + "doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.", + "fields": [{ + "name": "inheritNamespace", + "type": { + "type": "enum", + "name": "Understanding", + "doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.", + "symbols": ["d", "e"] + } + }, { + "name": "md5", + "type": { + "name": "md5_data", + "type": "fixed", + "size": 16, + "namespace": "ignored" + } + } + ] + } + }, + { + "name": "id", + "type": "int" + }, + { + "name": "bigId", + "type": "long" + }, + { + "name": "temperature", + "type": [ + "null", + "float" + ] + }, + { + "name": "fraction", + "type": [ + "null", + "double" + ] + }, + { + "name": "is_emergency", + "type": "boolean" + }, + { + "name": "remote_ip", + "type": [ + "null", + "bytes" + ] + }, + { + "name": "person", + "type": { + "fields": [ + { + "name": "lastname", + "type": "string" + }, + { + "name": "address", + "type": { + "fields": [ + { + "name": "streetaddress", + "type": "string" + }, + { + "name": "city", + "type": "string" + } + ], + "name": "AddressUSRecord", + "type": "record" + } + }, + { + "name": "mapfield", + "type": { + "default": { + }, + "type": "map", + "values": "long" + } + }, + { + "name": "arrayField", + "type": { + "default": [ + ], + "items": "string", + "type": "array" + } + } + ], + "name": "person_data", + "type": "record" + } + }, + { + "name": "decimalField", + "type": { + "logicalType": "decimal", + "precision": 4, + "scale": 2, + "type": "bytes" + } + }, + { + "logicalType": "uuid", + "name": "uuidField", + "type": "string" + }, + { + "name": "timemillis", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, + { + "name": "timemicros", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "timestampmillis", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "timestampmicros", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "duration", + "type": { + "name": "duration", + "namespace": "whyowhy", + "logicalType": "duration", + "size": 12, + "type": "fixed" + } + }, + { + "name": "date", + "type": { + "logicalType": "date", + "type": "int" + } + } + ], + "name": "Example", + "type": "record" + }`, + arrowSchema: []arrow.Field{ + { + Name: "explicitNamespace", + Type: &arrow.FixedSizeBinaryType{ByteWidth: 12}, + }, + { + Name: "fullName", + Type: arrow.StructOf( + arrow.Field{ + Name: "inheritNamespace", + Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType: arrow.BinaryTypes.String, Ordered: false}, + }, + arrow.Field{ + Name: "md5", + Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}, + }, + ), + }, + { + Name: "id", + Type: arrow.PrimitiveTypes.Int32, + }, + { + Name: "bigId", + Type: arrow.PrimitiveTypes.Int64, + }, + { + Name: "temperature", + Type: arrow.PrimitiveTypes.Float32, + Nullable: true, + }, + { + Name: "fraction", + Type: arrow.PrimitiveTypes.Float64, + Nullable: true, + }, + { + Name: "is_emergency", + Type: arrow.FixedWidthTypes.Boolean, + }, + { + Name: "remote_ip", + Type: arrow.BinaryTypes.Binary, + Nullable: true, + }, + { + Name: "person", + Type: arrow.StructOf( + arrow.Field{ + Name: "lastname", + Type: arrow.BinaryTypes.String, + }, + arrow.Field{ + Name: "address", + Type: arrow.StructOf( + arrow.Field{ + Name: "streetaddress", + Type: arrow.BinaryTypes.String, + }, + arrow.Field{ + Name: "city", + Type: arrow.BinaryTypes.String, + }, + ), + }, + arrow.Field{ + Name: "mapfield", + Type: arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int64), + Nullable: true, + }, + arrow.Field{ + Name: "arrayField", + Type: arrow.ListOfNonNullable(arrow.BinaryTypes.String), + }, + ), + }, + { + Name: "decimalField", + Type: &arrow.Decimal128Type{Precision: 4, Scale: 2}, + }, + { + Name: "uuidField", + Type: arrow.BinaryTypes.String, + }, + { + Name: "timemillis", + Type: arrow.FixedWidthTypes.Time32ms, + }, + { + Name: "timemicros", + Type: arrow.FixedWidthTypes.Time64us, + }, + { + Name: "timestampmillis", + Type: arrow.FixedWidthTypes.Timestamp_ms, + }, + { + Name: "timestampmicros", + Type: arrow.FixedWidthTypes.Timestamp_us, + }, + { + Name: "duration", + Type: arrow.FixedWidthTypes.MonthDayNanoInterval, + }, + { + Name: "date", + Type: arrow.FixedWidthTypes.Date32, + }, + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + want := arrow.NewSchema(test.arrowSchema, nil) + + schema, err := hamba.ParseBytes([]byte(test.avroSchema)) + if err != nil { + t.Fatalf("%v", err) + } + r := new(OCFReader) + r.avroSchema = schema.String() + r.editAvroSchema(schemaEdit{method: "delete", path: "fields.0"}) + schema, err = hamba.Parse(r.avroSchema) + if err != nil { + t.Fatalf("%v: could not parse modified avro schema", arrow.ErrInvalid) + } + got, err := ArrowSchemaFromAvro(schema) + if err != nil { + t.Fatalf("%v", err) + } + if !(fmt.Sprintf("%+v", want.String()) == fmt.Sprintf("%+v", got.String())) { + t.Fatalf("got=%v,\n want=%v", got.String(), want.String()) + } else { + t.Logf("schema.String() comparison passed") + } + }) + } +} diff --git a/go/arrow/avro/reader_types.go b/go/arrow/avro/reader_types.go new file mode 100644 index 0000000000000..5658c6e587db2 --- /dev/null +++ b/go/arrow/avro/reader_types.go @@ -0,0 +1,875 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "math/big" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/decimal128" + "github.com/apache/arrow/go/v15/arrow/decimal256" + "github.com/apache/arrow/go/v15/arrow/memory" + "github.com/apache/arrow/go/v15/internal/types" +) + +type dataLoader struct { + idx, depth int32 + list *fieldPos + item *fieldPos + mapField *fieldPos + mapKey *fieldPos + mapValue *fieldPos + fields []*fieldPos + children []*dataLoader +} + +var ( + ErrNullStructData = errors.New("null struct data") +) + +func newDataLoader() *dataLoader { return &dataLoader{idx: 0, depth: 0} } + +// drawTree takes the tree of field builders produced by mapFieldBuilders() +// and produces another tree structure and aggregates fields whose values can +// be retrieved from a `map[string]any` into a slice of builders, and creates a hierarchy to +// deal with nested types (lists and maps). +func (d *dataLoader) drawTree(field *fieldPos) { + for _, f := range field.children() { + if f.isList || f.isMap { + if f.isList { + c := d.newListChild(f) + if !f.childrens[0].isList { + c.item = f.childrens[0] + c.drawTree(f.childrens[0]) + } else { + c.drawTree(f.childrens[0].childrens[0]) + } + } + if f.isMap { + c := d.newMapChild(f) + if !arrow.IsNested(f.childrens[1].builder.Type().ID()) { + c.mapKey = f.childrens[0] + c.mapValue = f.childrens[1] + } else { + c.mapKey = f.childrens[0] + m := c.newChild() + m.mapValue = f.childrens[1] + m.drawTree(f.childrens[1]) + } + } + } else { + d.fields = append(d.fields, f) + if len(f.children()) > 0 { + d.drawTree(f) + } + } + } +} + +// loadDatum loads decoded Avro data to the schema fields' builder functions. +// Since array.StructBuilder.AppendNull() will recursively append null to all of the +// struct's fields, in the case of nil being passed to a struct's builderFunc it will +// return a ErrNullStructData error to signal that all its sub-fields can be skipped. +func (d *dataLoader) loadDatum(data any) error { + if d.list == nil && d.mapField == nil { + if d.mapValue != nil { + d.mapValue.appendFunc(data) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + if d.mapValue == nil { + err := f.appendFunc(f.getValue(data)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } else { + switch dt := data.(type) { + case nil: + err := f.appendFunc(dt) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + case []any: + if len(d.children) < 1 { + for _, e := range dt { + err := f.appendFunc(e) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + } else { + for _, e := range dt { + d.children[0].loadDatum(e) + } + } + case map[string]any: + err := f.appendFunc(f.getValue(dt)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + + } + } + for _, c := range d.children { + if c.list != nil { + c.loadDatum(c.list.getValue(data)) + } + if c.mapField != nil { + switch dt := data.(type) { + case nil: + c.loadDatum(dt) + case map[string]any: + c.loadDatum(c.mapField.getValue(dt)) + default: + c.loadDatum(c.mapField.getValue(data)) + } + } + } + } else { + if d.list != nil { + switch dt := data.(type) { + case nil: + d.list.appendFunc(dt) + case []any: + d.list.appendFunc(dt) + for _, e := range dt { + if d.item != nil { + d.item.appendFunc(e) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + err := f.appendFunc(f.getValue(e)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + for _, c := range d.children { + if c.list != nil { + c.loadDatum(c.list.getValue(e)) + } + if c.mapField != nil { + c.loadDatum(c.mapField.getValue(e)) + } + } + } + case map[string]any: + d.list.appendFunc(dt["array"]) + for _, e := range dt["array"].([]any) { + if d.item != nil { + d.item.appendFunc(e) + } + var NullParent *fieldPos + for _, f := range d.fields { + if f.parent == NullParent { + continue + } + err := f.appendFunc(f.getValue(e)) + if err != nil { + if err == ErrNullStructData { + NullParent = f + continue + } + return err + } + } + for _, c := range d.children { + c.loadDatum(c.list.getValue(e)) + } + } + default: + d.list.appendFunc(data) + d.item.appendFunc(dt) + } + } + if d.mapField != nil { + switch dt := data.(type) { + case nil: + d.mapField.appendFunc(dt) + case map[string]any: + + d.mapField.appendFunc(dt) + for k, v := range dt { + d.mapKey.appendFunc(k) + if d.mapValue != nil { + d.mapValue.appendFunc(v) + } else { + d.children[0].loadDatum(v) + } + } + } + } + } + return nil +} + +func (d *dataLoader) newChild() *dataLoader { + var child *dataLoader = &dataLoader{ + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +func (d *dataLoader) newListChild(list *fieldPos) *dataLoader { + var child *dataLoader = &dataLoader{ + list: list, + item: list.childrens[0], + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +func (d *dataLoader) newMapChild(mapField *fieldPos) *dataLoader { + var child *dataLoader = &dataLoader{ + mapField: mapField, + depth: d.depth + 1, + } + d.children = append(d.children, child) + return child +} + +type fieldPos struct { + parent *fieldPos + fieldName string + builder array.Builder + path []string + isList bool + isItem bool + isStruct bool + isMap bool + typeName string + appendFunc func(val interface{}) error + metadatas arrow.Metadata + childrens []*fieldPos + index, depth int32 +} + +func newFieldPos() *fieldPos { return &fieldPos{index: -1} } + +func (f *fieldPos) children() []*fieldPos { return f.childrens } + +func (f *fieldPos) newChild(childName string, childBuilder array.Builder, meta arrow.Metadata) *fieldPos { + var child fieldPos = fieldPos{ + parent: f, + fieldName: childName, + builder: childBuilder, + metadatas: meta, + index: int32(len(f.childrens)), + depth: f.depth + 1, + } + if f.isList { + child.isItem = true + } + child.path = child.buildNamePath() + f.childrens = append(f.childrens, &child) + return &child +} + +func (f *fieldPos) buildNamePath() []string { + var path []string + var listPath []string + cur := f + for i := f.depth - 1; i >= 0; i-- { + if cur.typeName == "" { + path = append([]string{cur.fieldName}, path...) + } else { + path = append([]string{cur.fieldName, cur.typeName}, path...) + } + if !cur.parent.isMap { + cur = cur.parent + } + } + if f.parent.parent != nil && f.parent.parent.isList { + for i := len(path) - 1; i >= 0; i-- { + if path[i] != "item" { + listPath = append([]string{path[i]}, listPath...) + } else { + return listPath + } + } + } + if f.parent != nil && f.parent.fieldName == "value" { + for i := len(path) - 1; i >= 0; i-- { + if path[i] != "value" { + listPath = append([]string{path[i]}, listPath...) + } else { + return listPath + } + } + } + return path +} + +// NamePath returns a slice of keys making up the path to the field +func (f *fieldPos) namePath() []string { return f.path } + +// GetValue retrieves the value from the map[string]any +// by following the field's key path +func (f *fieldPos) getValue(m any) any { + if _, ok := m.(map[string]any); !ok { + return m + } + for _, key := range f.namePath() { + valueMap, ok := m.(map[string]any) + if !ok { + if key == "item" { + return m + } + return nil + } + m, ok = valueMap[key] + if !ok { + return nil + } + } + return m +} + +// Avro data is loaded to Arrow arrays using the following type mapping: +// +// Avro Go Arrow +// null nil Null +// boolean bool Boolean +// bytes []byte Binary +// float float32 Float32 +// double float64 Float64 +// long int64 Int64 +// int int32 Int32 +// string string String +// array []interface{} List +// enum string Dictionary +// fixed []byte FixedSizeBinary +// map and record map[string]any Struct +// +// mapFieldBuilders builds a tree of field builders matching the Arrow schema +func mapFieldBuilders(b array.Builder, field arrow.Field, parent *fieldPos) { + f := parent.newChild(field.Name, b, field.Metadata) + switch bt := b.(type) { + case *array.BinaryBuilder: + f.appendFunc = func(data interface{}) error { + appendBinaryData(bt, data) + return nil + } + case *array.BinaryDictionaryBuilder: + // has metadata for Avro enum symbols + f.appendFunc = func(data interface{}) error { + appendBinaryDictData(bt, data) + return nil + } + // add Avro enum symbols to builder + sb := array.NewStringBuilder(memory.DefaultAllocator) + for _, v := range field.Metadata.Values() { + sb.Append(v) + } + sa := sb.NewStringArray() + bt.InsertStringDictValues(sa) + case *array.BooleanBuilder: + f.appendFunc = func(data interface{}) error { + appendBoolData(bt, data) + return nil + } + case *array.Date32Builder: + f.appendFunc = func(data interface{}) error { + appendDate32Data(bt, data) + return nil + } + case *array.Decimal128Builder: + f.appendFunc = func(data interface{}) error { + err := appendDecimal128Data(bt, data) + if err != nil { + return err + } + return nil + } + case *array.Decimal256Builder: + f.appendFunc = func(data interface{}) error { + err := appendDecimal256Data(bt, data) + if err != nil { + return err + } + return nil + } + case *types.UUIDBuilder: + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case string: + err := bt.AppendValueFromString(dt) + if err != nil { + return err + } + case []byte: + err := bt.AppendValueFromString(string(dt)) + if err != nil { + return err + } + } + return nil + } + case *array.FixedSizeBinaryBuilder: + f.appendFunc = func(data interface{}) error { + appendFixedSizeBinaryData(bt, data) + return nil + } + case *array.Float32Builder: + f.appendFunc = func(data interface{}) error { + appendFloat32Data(bt, data) + return nil + } + case *array.Float64Builder: + f.appendFunc = func(data interface{}) error { + appendFloat64Data(bt, data) + return nil + } + case *array.Int32Builder: + f.appendFunc = func(data interface{}) error { + appendInt32Data(bt, data) + return nil + } + case *array.Int64Builder: + f.appendFunc = func(data interface{}) error { + appendInt64Data(bt, data) + return nil + } + case *array.LargeListBuilder: + vb := bt.ValueBuilder() + f.isList = true + mapFieldBuilders(vb, field.Type.(*arrow.LargeListType).ElemField(), f) + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case []interface{}: + if len(dt) == 0 { + bt.AppendEmptyValue() + } else { + bt.Append(true) + } + default: + bt.Append(true) + } + return nil + } + case *array.ListBuilder: + vb := bt.ValueBuilder() + f.isList = true + mapFieldBuilders(vb, field.Type.(*arrow.ListType).ElemField(), f) + f.appendFunc = func(data interface{}) error { + switch dt := data.(type) { + case nil: + bt.AppendNull() + case []interface{}: + if len(dt) == 0 { + bt.AppendEmptyValue() + } else { + bt.Append(true) + } + default: + bt.Append(true) + } + return nil + } + case *array.MapBuilder: + // has metadata for objects in values + f.isMap = true + kb := bt.KeyBuilder() + ib := bt.ItemBuilder() + mapFieldBuilders(kb, field.Type.(*arrow.MapType).KeyField(), f) + mapFieldBuilders(ib, field.Type.(*arrow.MapType).ItemField(), f) + f.appendFunc = func(data interface{}) error { + switch data.(type) { + case nil: + bt.AppendNull() + default: + bt.Append(true) + } + return nil + } + case *array.MonthDayNanoIntervalBuilder: + f.appendFunc = func(data interface{}) error { + appendDurationData(bt, data) + return nil + } + case *array.StringBuilder: + f.appendFunc = func(data interface{}) error { + appendStringData(bt, data) + return nil + } + case *array.StructBuilder: + // has metadata for Avro Union named types + f.typeName, _ = field.Metadata.GetValue("typeName") + f.isStruct = true + // create children + for i, p := range field.Type.(*arrow.StructType).Fields() { + mapFieldBuilders(bt.FieldBuilder(i), p, f) + } + f.appendFunc = func(data interface{}) error { + switch data.(type) { + case nil: + bt.AppendNull() + return ErrNullStructData + default: + bt.Append(true) + } + return nil + } + case *array.Time32Builder: + f.appendFunc = func(data interface{}) error { + appendTime32Data(bt, data) + return nil + } + case *array.Time64Builder: + f.appendFunc = func(data interface{}) error { + appendTime64Data(bt, data) + return nil + } + case *array.TimestampBuilder: + f.appendFunc = func(data interface{}) error { + appendTimestampData(bt, data) + return nil + } + } +} + +func appendBinaryData(b *array.BinaryBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case map[string]any: + switch ct := dt["bytes"].(type) { + case nil: + b.AppendNull() + default: + b.Append(ct.([]byte)) + } + default: + b.Append(fmt.Append([]byte{}, data)) + } +} + +func appendBinaryDictData(b *array.BinaryDictionaryBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + b.AppendString(dt) + case map[string]any: + switch v := dt["string"].(type) { + case nil: + b.AppendNull() + case string: + b.AppendString(v) + } + } +} + +func appendBoolData(b *array.BooleanBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case bool: + b.Append(dt) + case map[string]any: + switch v := dt["boolean"].(type) { + case nil: + b.AppendNull() + case bool: + b.Append(v) + } + } +} + +func appendDate32Data(b *array.Date32Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Date32(dt)) + case map[string]any: + switch v := dt["int"].(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Date32(v)) + } + } +} + +func appendDecimal128Data(b *array.Decimal128Builder, data interface{}) error { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + buf := bytes.NewBuffer(dt) + if len(dt) <= 38 { + var intData int64 + err := binary.Read(buf, binary.BigEndian, &intData) + if err != nil { + return err + } + b.Append(decimal128.FromI64(intData)) + } else { + var bigIntData big.Int + b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + case map[string]any: + buf := bytes.NewBuffer(dt["bytes"].([]byte)) + if len(dt["bytes"].([]byte)) <= 38 { + var intData int64 + err := binary.Read(buf, binary.BigEndian, &intData) + if err != nil { + return err + } + b.Append(decimal128.FromI64(intData)) + } else { + var bigIntData big.Int + b.Append(decimal128.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + } + return nil +} + +func appendDecimal256Data(b *array.Decimal256Builder, data interface{}) error { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + var bigIntData big.Int + buf := bytes.NewBuffer(dt) + b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + case map[string]any: + var bigIntData big.Int + buf := bytes.NewBuffer(dt["bytes"].([]byte)) + b.Append(decimal256.FromBigInt(bigIntData.SetBytes(buf.Bytes()))) + } + return nil +} + +// Avro duration logical type annotates Avro fixed type of size 12, which stores three little-endian +// unsigned integers that represent durations at different granularities of time. The first stores +// a number in months, the second stores a number in days, and the third stores a number in milliseconds. +func appendDurationData(b *array.MonthDayNanoIntervalBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + dur := new(arrow.MonthDayNanoInterval) + dur.Months = int32(binary.LittleEndian.Uint16(dt[:3])) + dur.Days = int32(binary.LittleEndian.Uint16(dt[4:7])) + dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dt[8:]) * 1000000) + b.Append(*dur) + case map[string]any: + switch dtb := dt["bytes"].(type) { + case nil: + b.AppendNull() + case []byte: + dur := new(arrow.MonthDayNanoInterval) + dur.Months = int32(binary.LittleEndian.Uint16(dtb[:3])) + dur.Days = int32(binary.LittleEndian.Uint16(dtb[4:7])) + dur.Nanoseconds = int64(binary.LittleEndian.Uint32(dtb[8:]) * 1000000) + b.Append(*dur) + } + } +} + +func appendFixedSizeBinaryData(b *array.FixedSizeBinaryBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case []byte: + b.Append(dt) + case map[string]any: + switch v := dt["bytes"].(type) { + case nil: + b.AppendNull() + case []byte: + b.Append(v) + } + } +} + +func appendFloat32Data(b *array.Float32Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case float32: + b.Append(dt) + case map[string]any: + switch v := dt["float"].(type) { + case nil: + b.AppendNull() + case float32: + b.Append(v) + } + } +} + +func appendFloat64Data(b *array.Float64Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case float64: + b.Append(dt) + case map[string]any: + switch v := dt["double"].(type) { + case nil: + b.AppendNull() + case float64: + b.Append(v) + } + } +} + +func appendInt32Data(b *array.Int32Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int32(dt)) + case int32: + b.Append(dt) + case map[string]any: + switch v := dt["int"].(type) { + case nil: + b.AppendNull() + case int: + b.Append(int32(v)) + case int32: + b.Append(v) + } + } +} + +func appendInt64Data(b *array.Int64Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int: + b.Append(int64(dt)) + case int64: + b.Append(dt) + case map[string]any: + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int: + b.Append(int64(v)) + case int64: + b.Append(v) + } + } +} + +func appendStringData(b *array.StringBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case string: + b.Append(dt) + case map[string]any: + switch v := dt["string"].(type) { + case nil: + b.AppendNull() + case string: + b.Append(v) + } + default: + b.Append(fmt.Sprint(data)) + } +} + +func appendTime32Data(b *array.Time32Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Time32(dt)) + case map[string]any: + switch v := dt["int"].(type) { + case nil: + b.AppendNull() + case int32: + b.Append(arrow.Time32(v)) + } + } +} + +func appendTime64Data(b *array.Time64Builder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Time64(dt)) + case map[string]any: + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Time64(v)) + } + } +} + +func appendTimestampData(b *array.TimestampBuilder, data interface{}) { + switch dt := data.(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Timestamp(dt)) + case map[string]any: + switch v := dt["long"].(type) { + case nil: + b.AppendNull() + case int64: + b.Append(arrow.Timestamp(v)) + } + } +} diff --git a/go/arrow/avro/schema.go b/go/arrow/avro/schema.go new file mode 100644 index 0000000000000..32e37096c68f2 --- /dev/null +++ b/go/arrow/avro/schema.go @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package avro reads Avro OCF files and presents the extracted data as records +package avro + +import ( + "fmt" + "math" + "strconv" + + "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/decimal128" + "github.com/apache/arrow/go/v15/internal/types" + avro "github.com/hamba/avro/v2" +) + +type schemaNode struct { + name string + parent *schemaNode + schema avro.Schema + union bool + nullable bool + childrens []*schemaNode + arrowField arrow.Field + schemaCache *avro.SchemaCache + index, depth int32 +} + +func newSchemaNode() *schemaNode { + var schemaCache avro.SchemaCache + return &schemaNode{name: "", index: -1, schemaCache: &schemaCache} +} + +func (node *schemaNode) schemaPath() string { + var path string + n := node + for n.parent != nil { + path = "." + n.name + path + n = n.parent + } + return path +} + +func (node *schemaNode) newChild(n string, s avro.Schema) *schemaNode { + child := &schemaNode{ + name: n, + parent: node, + schema: s, + schemaCache: node.schemaCache, + index: int32(len(node.childrens)), + depth: node.depth + 1, + } + node.childrens = append(node.childrens, child) + return child +} +func (node *schemaNode) children() []*schemaNode { return node.childrens } + +// func (node *schemaNode) nodeName() string { return node.name } + +// ArrowSchemaFromAvro returns a new Arrow schema from an Avro schema +func ArrowSchemaFromAvro(schema avro.Schema) (s *arrow.Schema, err error) { + defer func() { + if r := recover(); r != nil { + s = nil + switch x := r.(type) { + case string: + err = fmt.Errorf("invalid avro schema: %s", x) + case error: + err = fmt.Errorf("invalid avro schema: %w", x) + default: + err = fmt.Errorf("invalid avro schema: unknown error") + } + } + }() + n := newSchemaNode() + n.schema = schema + c := n.newChild(n.schema.(avro.NamedSchema).Name(), n.schema) + arrowSchemafromAvro(c) + var fields []arrow.Field + for _, g := range c.children() { + fields = append(fields, g.arrowField) + } + s = arrow.NewSchema(fields, nil) + return s, nil +} + +func arrowSchemafromAvro(n *schemaNode) { + if ns, ok := n.schema.(avro.NamedSchema); ok { + n.schemaCache.Add(ns.Name(), ns) + } + switch st := n.schema.Type(); st { + case "record": + iterateFields(n) + case "enum": + n.schemaCache.Add(n.schema.(avro.NamedSchema).Name(), n.schema.(*avro.EnumSchema)) + symbols := make(map[string]string) + for index, symbol := range n.schema.(avro.PropertySchema).(*avro.EnumSchema).Symbols() { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := int64(len(symbols)) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxUint32: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + n.arrowField = buildArrowField(n, &dt, arrow.MetadataFrom(symbols)) + case "array": + // logical items type + c := n.newChild(n.name, n.schema.(*avro.ArraySchema).Items()) + if isLogicalSchemaType(n.schema.(*avro.ArraySchema).Items()) { + avroLogicalToArrowField(c) + } else { + arrowSchemafromAvro(c) + } + switch c.arrowField.Nullable { + case true: + n.arrowField = arrow.Field{Name: n.name, Type: arrow.ListOfField(c.arrowField), Metadata: c.arrowField.Metadata} + case false: + n.arrowField = arrow.Field{Name: n.name, Type: arrow.ListOfNonNullable(c.arrowField.Type), Metadata: c.arrowField.Metadata} + } + case "map": + n.schemaCache.Add(n.schema.(*avro.MapSchema).Values().(avro.NamedSchema).Name(), n.schema.(*avro.MapSchema).Values()) + c := n.newChild(n.name, n.schema.(*avro.MapSchema).Values()) + arrowSchemafromAvro(c) + n.arrowField = buildArrowField(n, arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), c.arrowField.Metadata) + case "union": + if n.schema.(*avro.UnionSchema).Nullable() { + if len(n.schema.(*avro.UnionSchema).Types()) > 1 { + n.schema = n.schema.(*avro.UnionSchema).Types()[1] + n.union = true + n.nullable = true + arrowSchemafromAvro(n) + } + } + // Avro "fixed" field type = Arrow FixedSize Primitive BinaryType + case "fixed": + n.schemaCache.Add(n.schema.(avro.NamedSchema).Name(), n.schema.(*avro.FixedSchema)) + if isLogicalSchemaType(n.schema) { + avroLogicalToArrowField(n) + } else { + n.arrowField = buildArrowField(n, &arrow.FixedSizeBinaryType{ByteWidth: n.schema.(*avro.FixedSchema).Size()}, arrow.Metadata{}) + } + case "string", "bytes", "int", "long": + if isLogicalSchemaType(n.schema) { + avroLogicalToArrowField(n) + } else { + n.arrowField = buildArrowField(n, avroPrimitiveToArrowType(string(st)), arrow.Metadata{}) + } + case "float", "double", "boolean": + n.arrowField = arrow.Field{Name: n.name, Type: avroPrimitiveToArrowType(string(st)), Nullable: n.nullable} + case "": + refSchema := n.schemaCache.Get(string(n.schema.(*avro.RefSchema).Schema().Name())) + if refSchema == nil { + panic(fmt.Errorf("could not find schema for '%v' in schema cache - %v", n.schemaPath(), n.schema.(*avro.RefSchema).Schema().Name())) + } + n.schema = refSchema + arrowSchemafromAvro(n) + case "null": + n.schemaCache.Add(n.schema.(*avro.MapSchema).Values().(avro.NamedSchema).Name(), &avro.NullSchema{}) + n.nullable = true + n.arrowField = buildArrowField(n, arrow.Null, arrow.Metadata{}) + } +} + +// iterate record Fields() +func iterateFields(n *schemaNode) { + for _, f := range n.schema.(*avro.RecordSchema).Fields() { + switch ft := f.Type().(type) { + // Avro "array" field type + case *avro.ArraySchema: + n.schemaCache.Add(f.Name(), ft.Items()) + // logical items type + c := n.newChild(f.Name(), ft.Items()) + if isLogicalSchemaType(ft.Items()) { + avroLogicalToArrowField(c) + } else { + arrowSchemafromAvro(c) + } + switch c.arrowField.Nullable { + case true: + c.arrowField = arrow.Field{Name: c.name, Type: arrow.ListOfField(c.arrowField), Metadata: c.arrowField.Metadata} + case false: + c.arrowField = arrow.Field{Name: c.name, Type: arrow.ListOfNonNullable(c.arrowField.Type), Metadata: c.arrowField.Metadata} + } + // Avro "enum" field type = Arrow dictionary type + case *avro.EnumSchema: + n.schemaCache.Add(f.Type().(*avro.EnumSchema).Name(), f.Type()) + c := n.newChild(f.Name(), f.Type()) + symbols := make(map[string]string) + for index, symbol := range ft.Symbols() { + k := strconv.FormatInt(int64(index), 10) + symbols[k] = symbol + } + var dt arrow.DictionaryType = arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint64, ValueType: arrow.BinaryTypes.String, Ordered: false} + sl := len(symbols) + switch { + case sl <= math.MaxUint8: + dt.IndexType = arrow.PrimitiveTypes.Uint8 + case sl > math.MaxUint8 && sl <= math.MaxUint16: + dt.IndexType = arrow.PrimitiveTypes.Uint16 + case sl > math.MaxUint16 && sl <= math.MaxInt: + dt.IndexType = arrow.PrimitiveTypes.Uint32 + } + c.arrowField = buildArrowField(c, &dt, arrow.MetadataFrom(symbols)) + // Avro "fixed" field type = Arrow FixedSize Primitive BinaryType + case *avro.FixedSchema: + n.schemaCache.Add(f.Name(), f.Type()) + c := n.newChild(f.Name(), f.Type()) + if isLogicalSchemaType(f.Type()) { + avroLogicalToArrowField(c) + } else { + arrowSchemafromAvro(c) + } + case *avro.RecordSchema: + n.schemaCache.Add(f.Name(), f.Type()) + c := n.newChild(f.Name(), f.Type()) + iterateFields(c) + // Avro "map" field type - KVP with value of one type - keys are strings + case *avro.MapSchema: + n.schemaCache.Add(f.Name(), ft.Values()) + c := n.newChild(f.Name(), ft.Values()) + arrowSchemafromAvro(c) + c.arrowField = buildArrowField(c, arrow.MapOf(arrow.BinaryTypes.String, c.arrowField.Type), c.arrowField.Metadata) + case *avro.UnionSchema: + if ft.Nullable() { + if len(ft.Types()) > 1 { + n.schemaCache.Add(f.Name(), ft.Types()[1]) + c := n.newChild(f.Name(), ft.Types()[1]) + c.union = true + c.nullable = true + arrowSchemafromAvro(c) + } + } + default: + n.schemaCache.Add(f.Name(), f.Type()) + if isLogicalSchemaType(f.Type()) { + c := n.newChild(f.Name(), f.Type()) + avroLogicalToArrowField(c) + } else { + c := n.newChild(f.Name(), f.Type()) + arrowSchemafromAvro(c) + } + + } + } + var fields []arrow.Field + for _, child := range n.children() { + fields = append(fields, child.arrowField) + } + + namedSchema, ok := isNamedSchema(n.schema) + + var md arrow.Metadata + if ok && namedSchema != n.name+"_data" && n.union { + md = arrow.NewMetadata([]string{"typeName"}, []string{namedSchema}) + } + n.arrowField = buildArrowField(n, arrow.StructOf(fields...), md) +} + +func isLogicalSchemaType(s avro.Schema) bool { + lts, ok := s.(avro.LogicalTypeSchema) + if !ok { + return false + } + if lts.Logical() != nil { + return true + } + return false +} + +func isNamedSchema(s avro.Schema) (string, bool) { + if ns, ok := s.(avro.NamedSchema); ok { + return ns.FullName(), ok + } + return "", false +} + +func buildArrowField(n *schemaNode, t arrow.DataType, m arrow.Metadata) arrow.Field { + return arrow.Field{ + Name: n.name, + Type: t, + Metadata: m, + Nullable: n.nullable, + } +} + +// Avro primitive type. +// +// NOTE: Arrow Binary type is used as a catchall to avoid potential data loss. +func avroPrimitiveToArrowType(avroFieldType string) arrow.DataType { + switch avroFieldType { + // int: 32-bit signed integer + case "int": + return arrow.PrimitiveTypes.Int32 + // long: 64-bit signed integer + case "long": + return arrow.PrimitiveTypes.Int64 + // float: single precision (32-bit) IEEE 754 floating-point number + case "float": + return arrow.PrimitiveTypes.Float32 + // double: double precision (64-bit) IEEE 754 floating-point number + case "double": + return arrow.PrimitiveTypes.Float64 + // bytes: sequence of 8-bit unsigned bytes + case "bytes": + return arrow.BinaryTypes.Binary + // boolean: a binary value + case "boolean": + return arrow.FixedWidthTypes.Boolean + // string: unicode character sequence + case "string": + return arrow.BinaryTypes.String + } + return nil +} + +func avroLogicalToArrowField(n *schemaNode) { + var dt arrow.DataType + // Avro logical types + switch lt := n.schema.(avro.LogicalTypeSchema).Logical(); lt.Type() { + // The decimal logical type represents an arbitrary-precision signed decimal number of the form unscaled × 10-scale. + // A decimal logical type annotates Avro bytes or fixed types. The byte array must contain the two’s-complement + // representation of the unscaled integer value in big-endian byte order. The scale is fixed, and is specified + // using an attribute. + // + // The following attributes are supported: + // scale, a JSON integer representing the scale (optional). If not specified the scale is 0. + // precision, a JSON integer representing the (maximum) precision of decimals stored in this type (required). + case "decimal": + id := arrow.DECIMAL128 + if lt.(*avro.DecimalLogicalSchema).Precision() > decimal128.MaxPrecision { + id = arrow.DECIMAL256 + } + dt, _ = arrow.NewDecimalType(id, int32(lt.(*avro.DecimalLogicalSchema).Precision()), int32(lt.(*avro.DecimalLogicalSchema).Scale())) + + // The uuid logical type represents a random generated universally unique identifier (UUID). + // A uuid logical type annotates an Avro string. The string has to conform with RFC-4122 + case "uuid": + dt = types.NewUUIDType() + + // The date logical type represents a date within the calendar, with no reference to a particular + // time zone or time of day. + // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, + // 1 January 1970 (ISO calendar). + case "date": + dt = arrow.FixedWidthTypes.Date32 + + // The time-millis logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one millisecond. + // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds + // after midnight, 00:00:00.000. + case "time-millis": + dt = arrow.FixedWidthTypes.Time32ms + + // The time-micros logical type represents a time of day, with no reference to a particular calendar, + // time zone or date, with a precision of one microsecond. + // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds + // after midnight, 00:00:00.000000. + case "time-micros": + dt = arrow.FixedWidthTypes.Time64us + + // The timestamp-millis logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one millisecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users in + // their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds + // from the unix epoch, 1 January 1970 00:00:00.000 UTC. + case "timestamp-millis": + dt = arrow.FixedWidthTypes.Timestamp_ms + + // The timestamp-micros logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one microsecond. Please note that time zone + // information gets lost in this process. Upon reading a value back, we can only reconstruct the instant, + // but not the original representation. In practice, such timestamps are typically displayed to users + // in their local time zones, therefore they may be displayed differently depending on the execution environment. + // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds + // from the unix epoch, 1 January 1970 00:00:00.000000 UTC. + case "timestamp-micros": + dt = arrow.FixedWidthTypes.Timestamp_us + + // The local-timestamp-millis logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one millisecond. + // A local-timestamp-millis logical type annotates an Avro long, where the long stores the number of + // milliseconds, from 1 January 1970 00:00:00.000. + // Note: not implemented in hamba/avro + // case "local-timestamp-millis": + // dt = &arrow.TimestampType{Unit: arrow.Millisecond} + + // The local-timestamp-micros logical type represents a timestamp in a local timezone, regardless of + // what specific time zone is considered local, with a precision of one microsecond. + // A local-timestamp-micros logical type annotates an Avro long, where the long stores the number of + // microseconds, from 1 January 1970 00:00:00.000000. + // case "local-timestamp-micros": + // Note: not implemented in hamba/avro + // dt = &arrow.TimestampType{Unit: arrow.Microsecond} + + // The duration logical type represents an amount of time defined by a number of months, days and milliseconds. + // This is not equivalent to a number of milliseconds, because, depending on the moment in time from which the + // duration is measured, the number of days in the month and number of milliseconds in a day may differ. Other + // standard periods such as years, quarters, hours and minutes can be expressed through these basic periods. + + // A duration logical type annotates Avro fixed type of size 12, which stores three little-endian unsigned integers + // that represent durations at different granularities of time. The first stores a number in months, the second + // stores a number in days, and the third stores a number in milliseconds. + case "duration": + dt = arrow.FixedWidthTypes.MonthDayNanoInterval + } + n.arrowField = buildArrowField(n, dt, arrow.Metadata{}) +} diff --git a/go/arrow/avro/schema_test.go b/go/arrow/avro/schema_test.go new file mode 100644 index 0000000000000..08a3fe1ed7440 --- /dev/null +++ b/go/arrow/avro/schema_test.go @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package avro + +import ( + "fmt" + "testing" + + "github.com/apache/arrow/go/v15/arrow" + hamba "github.com/hamba/avro/v2" +) + +func TestSchemaStringEqual(t *testing.T) { + tests := []struct { + avroSchema string + arrowSchema []arrow.Field + }{ + { + avroSchema: `{ + "fields": [ + { + "name": "inheritNull", + "type": { + "name": "Simple", + "symbols": [ + "a", + "b" + ], + "type": "enum" + } + }, + { + "name": "explicitNamespace", + "type": { + "name": "test", + "namespace": "org.hamba.avro", + "size": 12, + "type": "fixed" + } + }, + { + "name": "fullName", + "type": { + "type": "record", + "name": "fullName_data", + "namespace": "ignored", + "doc": "A name attribute with a fullname, so the namespace attribute is ignored. The fullname is 'a.full.Name', and the namespace is 'a.full'.", + "fields": [{ + "name": "inheritNamespace", + "type": { + "type": "enum", + "name": "Understanding", + "doc": "A simple name (attribute) and no namespace attribute: inherit the namespace of the enclosing type 'a.full.Name'. The fullname is 'a.full.Understanding'.", + "symbols": ["d", "e"] + } + }, { + "name": "md5", + "type": { + "name": "md5_data", + "type": "fixed", + "size": 16, + "namespace": "ignored" + } + } + ] + } + }, + { + "name": "id", + "type": "int" + }, + { + "name": "bigId", + "type": "long" + }, + { + "name": "temperature", + "type": [ + "null", + "float" + ] + }, + { + "name": "fraction", + "type": [ + "null", + "double" + ] + }, + { + "name": "is_emergency", + "type": "boolean" + }, + { + "name": "remote_ip", + "type": [ + "null", + "bytes" + ] + }, + { + "name": "person", + "type": { + "fields": [ + { + "name": "lastname", + "type": "string" + }, + { + "name": "address", + "type": { + "fields": [ + { + "name": "streetaddress", + "type": "string" + }, + { + "name": "city", + "type": "string" + } + ], + "name": "AddressUSRecord", + "type": "record" + } + }, + { + "name": "mapfield", + "type": { + "default": { + }, + "type": "map", + "values": "long" + } + }, + { + "name": "arrayField", + "type": { + "default": [ + ], + "items": "string", + "type": "array" + } + } + ], + "name": "person_data", + "type": "record" + } + }, + { + "name": "decimalField", + "type": { + "logicalType": "decimal", + "precision": 4, + "scale": 2, + "type": "bytes" + } + }, + { + "logicalType": "uuid", + "name": "uuidField", + "type": "string" + }, + { + "name": "timemillis", + "type": { + "type": "int", + "logicalType": "time-millis" + } + }, + { + "name": "timemicros", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "timestampmillis", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "timestampmicros", + "type": { + "type": "long", + "logicalType": "timestamp-micros" + } + }, + { + "name": "duration", + "type": { + "name": "duration", + "namespace": "whyowhy", + "logicalType": "duration", + "size": 12, + "type": "fixed" + } + }, + { + "name": "date", + "type": { + "logicalType": "date", + "type": "int" + } + } + ], + "name": "Example", + "type": "record" + }`, + arrowSchema: []arrow.Field{ + { + Name: "inheritNull", + Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType: arrow.BinaryTypes.String, Ordered: false}, + Metadata: arrow.MetadataFrom(map[string]string{"0": "a", "1": "b"}), + }, + { + Name: "explicitNamespace", + Type: &arrow.FixedSizeBinaryType{ByteWidth: 12}, + }, + { + Name: "fullName", + Type: arrow.StructOf( + arrow.Field{ + Name: "inheritNamespace", + Type: &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Uint8, ValueType: arrow.BinaryTypes.String, Ordered: false}, + }, + arrow.Field{ + Name: "md5", + Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}, + }, + ), + }, + { + Name: "id", + Type: arrow.PrimitiveTypes.Int32, + }, + { + Name: "bigId", + Type: arrow.PrimitiveTypes.Int64, + }, + { + Name: "temperature", + Type: arrow.PrimitiveTypes.Float32, + Nullable: true, + }, + { + Name: "fraction", + Type: arrow.PrimitiveTypes.Float64, + Nullable: true, + }, + { + Name: "is_emergency", + Type: arrow.FixedWidthTypes.Boolean, + }, + { + Name: "remote_ip", + Type: arrow.BinaryTypes.Binary, + Nullable: true, + }, + { + Name: "person", + Type: arrow.StructOf( + arrow.Field{ + Name: "lastname", + Type: arrow.BinaryTypes.String, + Nullable: true, + }, + arrow.Field{ + Name: "address", + Type: arrow.StructOf( + arrow.Field{ + Name: "streetaddress", + Type: arrow.BinaryTypes.String, + }, + arrow.Field{ + Name: "city", + Type: arrow.BinaryTypes.String, + }, + ), + }, + arrow.Field{ + Name: "mapfield", + Type: arrow.MapOf(arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int64), + Nullable: true, + }, + arrow.Field{ + Name: "arrayField", + Type: arrow.ListOfNonNullable(arrow.BinaryTypes.String), + }, + ), + }, + { + Name: "decimalField", + Type: &arrow.Decimal128Type{Precision: 4, Scale: 2}, + }, + { + Name: "uuidField", + Type: arrow.BinaryTypes.String, + }, + { + Name: "timemillis", + Type: arrow.FixedWidthTypes.Time32ms, + }, + { + Name: "timemicros", + Type: arrow.FixedWidthTypes.Time64us, + }, + { + Name: "timestampmillis", + Type: arrow.FixedWidthTypes.Timestamp_ms, + }, + { + Name: "timestampmicros", + Type: arrow.FixedWidthTypes.Timestamp_us, + }, + { + Name: "duration", + Type: arrow.FixedWidthTypes.MonthDayNanoInterval, + }, + { + Name: "date", + Type: arrow.FixedWidthTypes.Date32, + }, + }, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + want := arrow.NewSchema(test.arrowSchema, nil) + schema, err := hamba.ParseBytes([]byte(test.avroSchema)) + if err != nil { + t.Fatalf("%v", err) + } + got, err := ArrowSchemaFromAvro(schema) + if err != nil { + t.Fatalf("%v", err) + } + if !(fmt.Sprintf("%+v", want.String()) == fmt.Sprintf("%+v", got.String())) { + t.Fatalf("got=%v,\n want=%v", got.String(), want.String()) + } else { + t.Logf("schema.String() comparison passed") + } + }) + } +} diff --git a/go/arrow/avro/testdata/arrayrecordmap.avro b/go/arrow/avro/testdata/arrayrecordmap.avro new file mode 100644 index 0000000000000..84a8b59b427b5 Binary files /dev/null and b/go/arrow/avro/testdata/arrayrecordmap.avro differ diff --git a/go/arrow/avro/testdata/githubsamplecommits.avro b/go/arrow/avro/testdata/githubsamplecommits.avro new file mode 100644 index 0000000000000..f16d17d29e991 Binary files /dev/null and b/go/arrow/avro/testdata/githubsamplecommits.avro differ diff --git a/go/go.mod b/go/go.mod index a6c2af7025d32..73a1cb7e7738b 100644 --- a/go/go.mod +++ b/go/go.mod @@ -47,7 +47,9 @@ require ( require ( github.com/google/uuid v1.3.1 + github.com/hamba/avro/v2 v2.17.2 github.com/substrait-io/substrait-go v0.4.2 + github.com/tidwall/sjson v1.2.5 ) require ( @@ -57,14 +59,21 @@ require ( github.com/fatih/color v1.15.0 // indirect github.com/goccy/go-yaml v1.11.0 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/stretchr/objx v0.5.0 // indirect + github.com/tidwall/gjson v1.14.2 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/text v0.13.0 // indirect diff --git a/go/go.sum b/go/go.sum index bdd499c3f5190..2c1edd59e03a3 100644 --- a/go/go.sum +++ b/go/go.sum @@ -34,10 +34,15 @@ github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8i github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hamba/avro/v2 v2.17.2 h1:6PKpEWzJfNnvBgn7m2/8WYaDOUASxfDU+Jyb4ojDgFY= +github.com/hamba/avro/v2 v2.17.2/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= @@ -60,6 +65,13 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -73,12 +85,21 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/substrait-io/substrait-go v0.4.2 h1:buDnjsb3qAqTaNbOR7VKmNgXf4lYQxWEcnSGUWBtmN8= github.com/substrait-io/substrait-go v0.4.2/go.mod h1:qhpnLmrcvAnlZsUyPXZRqldiHapPTXC3t7xFgDi3aQg= +github.com/tidwall/gjson v1.14.2 h1:6BBkirS0rAHjumnjHF6qgy5d2YAJ1TLIaFE2lzfOLqo= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=