Skip to content

Commit

Permalink
GH-36760: [Go] Add Avro OCF reader (#37115)
Browse files Browse the repository at this point in the history
### Rationale for this change

### What changes are included in this PR?
Avro reader 

### Are these changes tested?
Local integration tests, no unit tests yet.

### Are there any user-facing changes?
New Avro reader API

* Closes: #36760

Lead-authored-by: Loïc Alleyne <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Raúl Cumplido <[email protected]>
Co-authored-by: sgilmore10 <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Alenka Frim <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: James Duong <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Dane Pitkin <[email protected]>
Co-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Matt Topol <[email protected]>
Co-authored-by: Curt Hagenlocher <[email protected]>
Co-authored-by: Jacob Wujciak-Jens <[email protected]>
Co-authored-by: Hyunseok Seo <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: James Duong <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Ivan Chesnov <[email protected]>
Co-authored-by: Diego Fernández Giraldo <[email protected]>
Co-authored-by: Bryce Mecum <[email protected]>
Co-authored-by: Jonathan Keane <[email protected]>
Co-authored-by: Peter Andreas Entschev <[email protected]>
Co-authored-by: abandy <[email protected]>
Co-authored-by: Lei Hou <[email protected]>
Co-authored-by: Yue <[email protected]>
Co-authored-by: davidhcoe <[email protected]>
Co-authored-by: Tsutomu Katsube <[email protected]>
Co-authored-by: Divyansh200102 <[email protected]>
Co-authored-by: Thomas Newton <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: prmoore77 <[email protected]>
Co-authored-by: Jiaxing Liang <[email protected]>
Co-authored-by: orgadish <[email protected]>
Co-authored-by: Maximilian Muecke <[email protected]>
Co-authored-by: Gavin Murrison <[email protected]>
Co-authored-by: William Ayd <[email protected]>
Co-authored-by: Laurent Goujon <[email protected]>
Co-authored-by: Jin Shang <[email protected]>
Co-authored-by: Alexander Grueneberg <[email protected]>
Co-authored-by: Paul Spangler <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Co-authored-by: Michael Lui <[email protected]>
Co-authored-by: patrick <[email protected]>
Co-authored-by: Dan Homola <[email protected]>
Co-authored-by: Ben Harkins <[email protected]>
Co-authored-by: Nick Hughes <[email protected]>
Co-authored-by: Fernando Mayer <[email protected]>
Co-authored-by: Rok Mihevc <[email protected]>
Co-authored-by: Francis <[email protected]>
Co-authored-by: Donald Tolley <[email protected]>
Co-authored-by: Judah Rand <[email protected]>
Co-authored-by: Eero Lihavainen <[email protected]>
Co-authored-by: Benjamin Schmidt <[email protected]>
Co-authored-by: Phillip LeBlanc <[email protected]>
Co-authored-by: Pierre Moulon <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Fokko Driesprong <[email protected]>
Co-authored-by: Bryan Cutler <[email protected]>
Co-authored-by: Diogo Teles Sant'Anna <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Junming Chen <[email protected]>
Co-authored-by: Chris Larsen <[email protected]>
Co-authored-by: Dan Stone <[email protected]>
Co-authored-by: Tim Schaub <[email protected]>
Co-authored-by: Will Jones <[email protected]>
Co-authored-by: Kevin Gurney <[email protected]>
Co-authored-by: Sarah Gilmore <[email protected]>
Co-authored-by: jeremy <[email protected]>
Co-authored-by: Dan Homola <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Jeremy Aguilon <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
Co-authored-by: scoder <[email protected]>
Co-authored-by: voidstar69 <[email protected]>
Co-authored-by: Dewey Dunnington <[email protected]>
Co-authored-by: Ivan Chesnov <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
Show file tree
Hide file tree
Showing 11 changed files with 2,601 additions and 0 deletions.
119 changes: 119 additions & 0 deletions go/arrow/avro/avro2parquet/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"bufio"
"bytes"
"flag"
"fmt"
"log"
"os"
"runtime/pprof"
"time"

"github.com/apache/arrow/go/v15/arrow/avro"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/compress"
pq "github.com/apache/arrow/go/v15/parquet/pqarrow"
)

var (
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to `file`")
filepath = flag.String("file", "", "avro ocf to convert")
)

func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal("could not create CPU profile: ", err)
}
defer f.Close() // error handling omitted for example
if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err)
}
defer pprof.StopCPUProfile()
}
if *filepath == "" {
fmt.Println("no file specified")
}
chunk := 1024 * 8
ts := time.Now()
log.Println("starting:")
info, err := os.Stat(*filepath)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
filesize := info.Size()
data, err := os.ReadFile(*filepath)
if err != nil {
fmt.Println(err)
os.Exit(2)
}
fmt.Printf("file : %v\nsize: %v MB\n", filepath, float64(filesize)/1024/1024)

r := bytes.NewReader(data)
ior := bufio.NewReaderSize(r, 4096*8)
av2arReader, err := avro.NewOCFReader(ior, avro.WithChunk(chunk))
if err != nil {
fmt.Println(err)
os.Exit(3)
}
fp, err := os.OpenFile(*filepath+".parquet", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
fmt.Println(err)
os.Exit(4)
}
defer fp.Close()
pwProperties := parquet.NewWriterProperties(parquet.WithDictionaryDefault(true),
parquet.WithVersion(parquet.V2_LATEST),
parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithBatchSize(1024*32),
parquet.WithDataPageSize(1024*1024),
parquet.WithMaxRowGroupLength(64*1024*1024),
)
awProperties := pq.NewArrowWriterProperties(pq.WithStoreSchema())
pr, err := pq.NewFileWriter(av2arReader.Schema(), fp, pwProperties, awProperties)
if err != nil {
fmt.Println(err)
os.Exit(5)
}
defer pr.Close()
fmt.Printf("parquet version: %v\n", pwProperties.Version())
for av2arReader.Next() {
if av2arReader.Err() != nil {
fmt.Println(err)
os.Exit(6)
}
recs := av2arReader.Record()
err = pr.WriteBuffered(recs)
if err != nil {
fmt.Println(err)
os.Exit(7)
}
recs.Release()
}
if av2arReader.Err() != nil {
fmt.Println(av2arReader.Err())
}

pr.Close()
log.Printf("time to convert: %v\n", time.Since(ts))
}
85 changes: 85 additions & 0 deletions go/arrow/avro/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package avro

import (
"errors"
"fmt"
"io"
)

func (r *OCFReader) decodeOCFToChan() {
defer close(r.avroChan)
for r.r.HasNext() {
select {
case <-r.readerCtx.Done():
r.err = fmt.Errorf("avro decoding cancelled, %d records read", r.avroDatumCount)
return
default:
var datum any
err := r.r.Decode(&datum)
if err != nil {
if errors.Is(err, io.EOF) {
r.err = nil
return
}
r.err = err
return
}
r.avroChan <- datum
r.avroDatumCount++
}
}
}

func (r *OCFReader) recordFactory() {
defer close(r.recChan)
r.primed = true
recChunk := 0
switch {
case r.chunk < 1:
for data := range r.avroChan {
err := r.ldr.loadDatum(data)
if err != nil {
r.err = err
return
}
}
r.recChan <- r.bld.NewRecord()
r.bldDone <- struct{}{}
case r.chunk >= 1:
for data := range r.avroChan {
if recChunk == 0 {
r.bld.Reserve(r.chunk)
}
err := r.ldr.loadDatum(data)
if err != nil {
r.err = err
return
}
recChunk++
if recChunk >= r.chunk {
r.recChan <- r.bld.NewRecord()
recChunk = 0
}
}
if recChunk != 0 {
r.recChan <- r.bld.NewRecord()
}
r.bldDone <- struct{}{}
}
}
Loading

0 comments on commit 5ab60ea

Please sign in to comment.