From eabe0358e97e88ebc5b12f7f1c511d4ce86129fe Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Wed, 3 Jan 2024 14:29:47 -0500 Subject: [PATCH] Upgrade Zed to v1.13.0 --- cmd/zync/from-kafka/command.go | 10 +++++----- connectjson/connectjson.go | 30 +++++++++++++--------------- etl/pipeline.go | 36 ++++++++++++++++------------------ etl/pool.go | 20 +++++++++---------- fifo/consumer.go | 26 ++++++++++++------------ fifo/lake.go | 5 +++-- fifo/producer.go | 22 +++++++++------------ go.mod | 2 +- go.sum | 4 ++-- zavro/decoder.go | 12 +++++------- zavro/encoder.go | 12 ++++++------ 11 files changed, 84 insertions(+), 95 deletions(-) diff --git a/cmd/zync/from-kafka/command.go b/cmd/zync/from-kafka/command.go index a5f9ade..b8a6fe3 100644 --- a/cmd/zync/from-kafka/command.go +++ b/cmd/zync/from-kafka/command.go @@ -148,8 +148,8 @@ func (f *From) Run(args []string) error { } var fifoLakes []*fifo.Lake - var fifoLakeChs []<-chan *zed.Value - topicToChs := map[string][]chan<- *zed.Value{} + var fifoLakeChs []<-chan zed.Value + topicToChs := map[string][]chan<- zed.Value{} topicToOffset := map[string]int64{} group, groupCtx := errgroup.WithContext(ctx) @@ -161,7 +161,7 @@ func (f *From) Run(args []string) error { if err != nil { return fmt.Errorf("pool %s: %w", pool, err) } - ch := make(chan *zed.Value) + ch := make(chan zed.Value) mu.Lock() fifoLakes = append(fifoLakes, fifoLake) fifoLakeChs = append(fifoLakeChs, ch) @@ -215,7 +215,7 @@ func (f *From) Run(args []string) error { return group.Wait() } -func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[string][]chan<- *zed.Value) error { +func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[string][]chan<- zed.Value) error { for { val, err := c.ReadValue(ctx) if err != nil { @@ -243,7 +243,7 @@ func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[str } } -func (f *From) runLoad(ctx, timeoutCtx context.Context, zctx *zed.Context, fifoLake *fifo.Lake, shaper string, ch <-chan *zed.Value) error { +func (f *From) runLoad(ctx, timeoutCtx context.Context, zctx *zed.Context, fifoLake *fifo.Lake, shaper string, ch <-chan zed.Value) error { ticker := time.NewTicker(f.interval) defer ticker.Stop() // Stop ticker until data arrives. diff --git a/connectjson/connectjson.go b/connectjson/connectjson.go index 19b282c..51e07ce 100644 --- a/connectjson/connectjson.go +++ b/connectjson/connectjson.go @@ -12,7 +12,7 @@ import ( "strings" "github.com/brimdata/zed" - "github.com/brimdata/zed/runtime/expr" + "github.com/brimdata/zed/runtime/sam/expr" "github.com/brimdata/zed/zcode" "github.com/brimdata/zed/zio/jsonio" "github.com/brimdata/zed/zson" @@ -27,11 +27,11 @@ type connectSchema struct { Field string `json:"field,omitempty"` } -func Encode(val *zed.Value) ([]byte, error) { - if zed.TypeUnder(val.Type) == zed.TypeNull { +func Encode(val zed.Value) ([]byte, error) { + if zed.TypeUnder(val.Type()) == zed.TypeNull { return nil, nil } - schema, err := marshalSchema(val.Type) + schema, err := marshalSchema(val.Type()) if err != nil { return nil, err } @@ -39,7 +39,7 @@ func Encode(val *zed.Value) ([]byte, error) { Schema *connectSchema `json:"schema"` Payload interface{} `json:"payload"` }{ - schema, marshalPayload(val.Type, val.Bytes()), + schema, marshalPayload(val.Type(), val.Bytes()), }) } @@ -169,7 +169,6 @@ type Decoder struct { jsonioReader *jsonio.Reader shapers map[string]*expr.ConstShaper this expr.This - val zed.Value } func NewDecoder(zctx *zed.Context) *Decoder { @@ -184,7 +183,7 @@ func NewDecoder(zctx *zed.Context) *Decoder { } } -func (c *Decoder) Decode(b []byte) (*zed.Value, error) { +func (c *Decoder) Decode(b []byte) (zed.Value, error) { b = bytes.TrimSpace(b) if len(b) == 0 { return zed.Null, nil @@ -202,13 +201,13 @@ func (c *Decoder) Decode(b []byte) (*zed.Value, error) { return nil }) if err != nil { - return nil, err + return zed.Null, err } c.buf.Reset() c.buf.Write(payload) val, err := c.jsonioReader.Read() if err != nil { - return nil, err + return zed.Null, err } // Using the schema's JSON encoding as the key here means different // encodings of the same schema won't share an entry, but that should be @@ -217,16 +216,16 @@ func (c *Decoder) Decode(b []byte) (*zed.Value, error) { if !ok { var cs connectSchema if err := json.Unmarshal(schema, &cs); err != nil { - return nil, err + return zed.Null, err } _, typ, err := c.decodeSchema(&cs) if err != nil { - return nil, err + return zed.Null, err } shaper = expr.NewConstShaper(c.zctx, &c.this, typ, expr.Cast|expr.Order) c.shapers[string(schema)] = shaper } - return c.decodeBytes(shaper.Eval(c.ectx, val)), nil + return c.decodeBytes(shaper.Eval(c.ectx, *val)), nil } func (c *Decoder) decodeSchema(s *connectSchema) (string, zed.Type, error) { @@ -277,12 +276,12 @@ func (c *Decoder) decodeSchema(s *connectSchema) (string, zed.Type, error) { return s.Field, typ, err } -func (c *Decoder) decodeBytes(val *zed.Value) *zed.Value { +func (c *Decoder) decodeBytes(val zed.Value) zed.Value { if val.IsNull() { return val } c.builder.Truncate() - err := Walk(val.Type, val.Bytes(), func(typ zed.Type, bytes zcode.Bytes) error { + err := Walk(val.Type(), val.Bytes(), func(typ zed.Type, bytes zcode.Bytes) error { if bytes == nil { c.builder.Append(nil) } else if zed.IsContainerType(typ) { @@ -304,8 +303,7 @@ func (c *Decoder) decodeBytes(val *zed.Value) *zed.Value { if err != nil { panic(err) } - c.val = *zed.NewValue(val.Type, c.builder.Bytes().Body()) - return &c.val + return zed.NewValue(val.Type(), c.builder.Bytes().Body()) } func Walk(typ zed.Type, body zcode.Bytes, visit zed.Visitor) error { diff --git a/etl/pipeline.go b/etl/pipeline.go index 4d09679..da089a0 100644 --- a/etl/pipeline.go +++ b/etl/pipeline.go @@ -98,20 +98,19 @@ func (p *Pipeline) writeToOutputPool(ctx context.Context, batch *zbuf.Array) err if err != nil { return err } - vals := batch.Values() - for k := range vals { + for _, rec := range batch.Values() { //XXX This still doesn't work with the zctx bug fix. See issue #31 //if vals[k].Type == p.doneType { // out.Append(&vals[k]) //} - if typedef, ok := vals[k].Type.(*zed.TypeNamed); ok && typedef.Name == "done" { - out.Append(&vals[k]) + if named, ok := rec.Type().(*zed.TypeNamed); ok && named.Name == "done" { + out.Append(rec) } - if extra := vals[k].Deref("left"); extra != nil { - out.Append(extra) + if extra := rec.Deref("left"); extra != nil { + out.Append(*extra) } - if extra := vals[k].Deref("right"); extra != nil { - out.Append(extra) + if extra := rec.Deref("right"); extra != nil { + out.Append(*extra) } } //XXX We need to track the commitID and use new commit-only-if @@ -130,26 +129,24 @@ func insertOffsets(ctx context.Context, zctx *zed.Context, doneType zed.Type, ba // flow count() but that is not implemented yet. Instead, we just format // up some ZSON that can then insert the proper offsets in each record. var zsons strings.Builder - vals := batch.Values() - for k := range vals { + for _, rec := range batch.Values() { // This pointer comparison should work but it doesn't right now. // Are they all allocated in the same zctx? //if vals[k].Type == doneType { // continue //} - if typedef, ok := vals[k].Type.(*zed.TypeNamed); ok && typedef.Name == "done" { + if named, ok := rec.Type().(*zed.TypeNamed); ok && named.Name == "done" { continue } - if vals[k].Deref("left") != nil { + if rec.Deref("left") != nil { continue } - rec := zson.FormatValue(&vals[k]) - topic, _, err := getKafkaMeta(&vals[k]) + topic, _, err := getKafkaMeta(rec) if err != nil { return nil, err } off := offsets[topic] - zsons.WriteString(fmt.Sprintf("{rec:%s,offset:%d}\n", rec, off)) + zsons.WriteString(fmt.Sprintf("{rec:%s,offset:%d}\n", zson.FormatValue(rec), off)) offsets[topic] = off + 1 } comp := compiler.NewCompiler() @@ -162,20 +159,21 @@ func insertOffsets(ctx context.Context, zctx *zed.Context, doneType zed.Type, ba if err != nil { return nil, err } - return NewArrayFromReader(q.AsReader()) + defer q.Pull(true) + return NewArrayFromReader(zbuf.PullerReader(q)) } -func getKafkaMeta(rec *zed.Value) (string, int64, error) { +func getKafkaMeta(rec zed.Value) (string, int64, error) { // XXX this API should be simplified in zed package kafkaRec := rec.Deref("kafka") if kafkaRec == nil { return "", 0, fmt.Errorf("value missing 'kafka' metadata field: %s", zson.FormatValue(rec)) } - topic, err := FieldAsString(kafkaRec, "topic") + topic, err := FieldAsString(*kafkaRec, "topic") if err != nil { return "", 0, err } - offset, err := FieldAsInt(kafkaRec, "offset") + offset, err := FieldAsInt(*kafkaRec, "offset") if err != nil { return "", 0, err } diff --git a/etl/pool.go b/etl/pool.go index 0b13c85..a808c45 100644 --- a/etl/pool.go +++ b/etl/pool.go @@ -66,9 +66,7 @@ func (p *Pool) NextProducerOffsets(ctx context.Context) (map[string]int64, error // Note at start-up if there are no offsets, then we will return an empty // map and the caller will get offset 0 for the next offset of any lookups. offsets := make(map[string]int64) - vals := batch.Values() - for k := range vals { - rec := &vals[k] + for _, rec := range batch.Values() { offset, err := FieldAsInt(rec, "offset") if err != nil { return nil, err @@ -90,34 +88,34 @@ func NewArrayFromReader(zr zio.Reader) (*zbuf.Array, error) { return &a, nil } -func Field(val *zed.Value, field string) (*zed.Value, error) { +func Field(val zed.Value, field string) (zed.Value, error) { fieldVal := val.Deref(field) if fieldVal == nil { - return nil, fmt.Errorf("field %q not found in %q", field, zson.FormatValue(val)) + return zed.Null, fmt.Errorf("field %q not found in %q", field, zson.FormatValue(val)) } if fieldVal.IsNull() { - return nil, fmt.Errorf("field %q null in %q", field, zson.FormatValue(val)) + return zed.Null, fmt.Errorf("field %q null in %q", field, zson.FormatValue(val)) } - return fieldVal, nil + return *fieldVal, nil } -func FieldAsInt(val *zed.Value, field string) (int64, error) { +func FieldAsInt(val zed.Value, field string) (int64, error) { fieldVal, err := Field(val, field) if err != nil { return 0, err } - if !zed.IsInteger(fieldVal.Type.ID()) { + if !zed.IsInteger(fieldVal.Type().ID()) { return 0, fmt.Errorf("field %q not an interger in %q", field, zson.FormatValue(val)) } return fieldVal.AsInt(), nil } -func FieldAsString(val *zed.Value, field string) (string, error) { +func FieldAsString(val zed.Value, field string) (string, error) { fieldVal, err := Field(val, field) if err != nil { return "", err } - if fieldVal.Type.ID() != zed.IDString { + if fieldVal.Type().ID() != zed.IDString { return "", fmt.Errorf("field %q not a string in %q", field, zson.FormatValue(val)) } return fieldVal.AsString(), nil diff --git a/fifo/consumer.go b/fifo/consumer.go index 5d9f3bc..7a1e7f3 100644 --- a/fifo/consumer.go +++ b/fifo/consumer.go @@ -32,10 +32,10 @@ type Consumer struct { // decoder wraps the Decode method. // -// Implementations retain ownership of val and val.Bytes, which remain valid -// until the next Decode.. +// Implementations retain ownership of val.Bytes, which remain valid +// until the next Decode. type decoder interface { - Decode(b []byte) (val *zed.Value, err error) + Decode(b []byte) (val zed.Value, err error) } func NewConsumer(zctx *zed.Context, opts []kgo.Opt, reg *srclient.SchemaRegistryClient, format string, topics map[string]int64, meta bool) (*Consumer, error) { @@ -81,7 +81,7 @@ func (c *Consumer) Close() { // ReadValue returns the next value. Unlike zio.Reader.Read, the caller // receives ownership of zed.Value.Bytes. -func (c *Consumer) ReadValue(ctx context.Context) (*zed.Value, error) { +func (c *Consumer) ReadValue(ctx context.Context) (zed.Value, error) { for { if !c.recordIter.Done() { return c.handle(c.recordIter.Next()) @@ -89,9 +89,9 @@ func (c *Consumer) ReadValue(ctx context.Context) (*zed.Value, error) { fetches := c.kclient.PollFetches(ctx) for _, e := range fetches.Errors() { if e.Topic != "" { - return nil, fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err) + return zed.Null, fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err) } - return nil, e.Err + return zed.Null, e.Err } c.recordIter = *fetches.RecordIter() } @@ -114,9 +114,9 @@ func (c *Consumer) Run(ctx context.Context, w zio.Writer, timeout time.Duration) } } -func (c *Consumer) handle(krec *kgo.Record) (*zed.Value, error) { +func (c *Consumer) handle(krec *kgo.Record) (zed.Value, error) { if saved := c.savedOffsets[krec.Topic]; krec.Offset < saved { - return nil, fmt.Errorf("topic %s, partition %d: received offset %d is less than saved offset %d", + return zed.Null, fmt.Errorf("topic %s, partition %d: received offset %d is less than saved offset %d", krec.Topic, krec.Partition, krec.Offset, saved) } c.savedOffsets[krec.Topic] = krec.Offset @@ -131,18 +131,18 @@ func (c *Consumer) handle(krec *kgo.Record) (*zed.Value, error) { } key, err := c.decoder.Decode(krec.Key) if err != nil { - return nil, err + return zed.Null, err } - keyType := key.Type + keyType := key.Type() b.Append(key.Bytes()) val, err := c.decoder.Decode(krec.Value) if err != nil { - return nil, err + return zed.Null, err } b.Append(val.Bytes()) - outerType, err := c.outerType(keyType, val.Type) + outerType, err := c.outerType(keyType, val.Type()) if err != nil { - return nil, err + return zed.Null, err } return zed.NewValue(outerType, b.Bytes()), nil } diff --git a/fifo/lake.go b/fifo/lake.go index fdbe581..75c67dd 100644 --- a/fifo/lake.go +++ b/fifo/lake.go @@ -74,7 +74,7 @@ func (l *Lake) NextConsumerOffset(ctx context.Context, topic string) (int64, err // This should not happen. return 0, errors.New("'tail 1' returned more than one record") } - offset, err := etl.FieldAsInt(&vals[0], "offset") + offset, err := etl.FieldAsInt(vals[0], "offset") if err != nil { return 0, err } @@ -101,5 +101,6 @@ func RunLocalQuery(ctx context.Context, zctx *zed.Context, batch *zbuf.Array, qu if err != nil { return nil, err } - return etl.NewArrayFromReader(q.AsReader()) + defer q.Pull(true) + return etl.NewArrayFromReader(zbuf.PullerReader(q)) } diff --git a/fifo/producer.go b/fifo/producer.go index 34b1d02..d1ca40c 100644 --- a/fifo/producer.go +++ b/fifo/producer.go @@ -15,13 +15,13 @@ import ( ) type Producer struct { - encode func(*zed.Value) ([]byte, error) + encode func(zed.Value) ([]byte, error) kclient *kgo.Client topic string } func NewProducer(opts []kgo.Opt, reg *srclient.SchemaRegistryClient, format, topic, namespace string) (*Producer, error) { - var encode func(*zed.Value) ([]byte, error) + var encode func(zed.Value) ([]byte, error) switch format { case "avro": encode = zavro.NewEncoder(namespace, reg).Encode @@ -55,7 +55,7 @@ func (p *Producer) Run(ctx context.Context, reader zio.Reader) error { if rec == nil || err != nil { break } - err = p.write(ctx, rec) + err = p.write(ctx, *rec) if err != nil { break } @@ -71,29 +71,25 @@ func (p *Producer) Run(ctx context.Context, reader zio.Reader) error { } func (p *Producer) Send(ctx context.Context, batch zbuf.Batch) error { - values := batch.Values() - for k := range values { - if err := p.write(ctx, &values[k]); err != nil { + for _, rec := range batch.Values() { + if err := p.write(ctx, rec); err != nil { return err } } return p.kclient.Flush(ctx) } -func (p *Producer) write(ctx context.Context, rec *zed.Value) error { - key := rec.Deref("key") - if key == nil { - key = zed.Null - } +func (p *Producer) write(ctx context.Context, rec zed.Value) error { + key := rec.Deref("key").MissingAsNull() val := rec.Deref("value") if val == nil { - val = rec + val = &rec } keyBytes, err := p.encode(key) if err != nil { return err } - valBytes, err := p.encode(val) + valBytes, err := p.encode(*val) if err != nil { return err } diff --git a/go.mod b/go.mod index a2160cf..9a026fc 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/brimdata/zync go 1.21 require ( - github.com/brimdata/zed v1.12.0 + github.com/brimdata/zed v1.13.0 github.com/buger/jsonparser v1.1.1 github.com/go-avro/avro v0.0.0-20171219232920-444163702c11 github.com/riferrei/srclient v0.4.0 diff --git a/go.sum b/go.sum index 42605b2..2b61ef6 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,8 @@ github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f h1:y06x6vGnFYf github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f/go.mod h1:2stgcRjl6QmW+gU2h5E7BQXg4HU0gzxKWDuT5HviN9s= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/brimdata/zed v1.12.0 h1:JzDjjcdEI6GICUG07d2B3tQnRWcfUav9bglXzKnVXpY= -github.com/brimdata/zed v1.12.0/go.mod h1:bD4XpGLwJ2ZHUDBVtPuY+KCF5ogemaOXW2NTrVnRuaw= +github.com/brimdata/zed v1.13.0 h1:BBx8QZ9Q+qEAy/xZOdac/sO6z7AOfX08M1Yo+Yv7G8M= +github.com/brimdata/zed v1.13.0/go.mod h1:Q5B3uAVimKLpO45a6konNLHEdyMELY2LaX2BMnKCJYM= github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/zavro/decoder.go b/zavro/decoder.go index dd22ed4..806a1a9 100644 --- a/zavro/decoder.go +++ b/zavro/decoder.go @@ -17,7 +17,6 @@ type Decoder struct { builder zcode.Builder schemas map[int]schemaAndType - val zed.Value } type schemaAndType struct { @@ -33,24 +32,23 @@ func NewDecoder(registry *srclient.SchemaRegistryClient, zctx *zed.Context) *Dec } } -func (d *Decoder) Decode(b []byte) (*zed.Value, error) { +func (d *Decoder) Decode(b []byte) (zed.Value, error) { if len(b) == 0 { return zed.Null, nil } if len(b) < 5 { - return nil, fmt.Errorf("Kafka-Avro header is too short: len %d", len(b)) + return zed.Null, fmt.Errorf("Kafka-Avro header is too short: len %d", len(b)) } id := int(binary.BigEndian.Uint32(b[1:5])) schema, typ, err := d.getSchema(id) if err != nil { - return nil, fmt.Errorf("could not retrieve schema ID %d: %w", id, err) + return zed.Null, fmt.Errorf("could not retrieve schema ID %d: %w", id, err) } d.builder.Truncate() if err := Decode(&d.builder, b[5:], schema); err != nil { - return nil, err + return zed.Null, err } - d.val = *zed.NewValue(typ, d.builder.Bytes().Body()) - return &d.val, nil + return zed.NewValue(typ, d.builder.Bytes().Body()), nil } func (d *Decoder) getSchema(id int) (avro.Schema, zed.Type, error) { diff --git a/zavro/encoder.go b/zavro/encoder.go index 2d30be9..ea93e11 100644 --- a/zavro/encoder.go +++ b/zavro/encoder.go @@ -26,12 +26,12 @@ func NewEncoder(namespace string, registry *srclient.SchemaRegistryClient) *Enco return &Encoder{namespace: namespace, registry: registry, schemaIDs: map[zed.Type]int{}} } -func (e *Encoder) Encode(val *zed.Value) ([]byte, error) { - id, err := e.getSchemaID(val.Type) +func (e *Encoder) Encode(val zed.Value) ([]byte, error) { + id, err := e.getSchemaID(val.Type()) if err != nil { return nil, err } - return Encode(nil, uint32(id), *val) + return Encode(nil, uint32(id), val) } func (e *Encoder) getSchemaID(typ zed.Type) (int, error) { @@ -77,7 +77,7 @@ func zlen(zv zcode.Bytes) (int, error) { } func encodeAny(dst []byte, zv zed.Value) ([]byte, error) { - switch typ := zed.TypeUnder(zv.Type).(type) { + switch typ := zed.TypeUnder(zv.Type()).(type) { case *zed.TypeRecord: return encodeRecord(dst, typ, zv.Bytes()) case *zed.TypeArray: @@ -100,7 +100,7 @@ func encodeArray(dst []byte, elemType zed.Type, body zcode.Bytes) ([]byte, error } dst = appendVarint(dst, int64(cnt)) for it := body.Iter(); !it.Done(); { - dst, err = encodeAny(dst, *zed.NewValue(elemType, it.Next())) + dst, err = encodeAny(dst, zed.NewValue(elemType, it.Next())) if err != nil { return nil, err } @@ -131,7 +131,7 @@ func encodeRecord(dst []byte, typ *zed.TypeRecord, body zcode.Bytes) ([]byte, er // the type's position in the union. dst = appendVarint(dst, 1) var err error - dst, err = encodeAny(dst, *zed.NewValue(f.Type, body)) + dst, err = encodeAny(dst, zed.NewValue(f.Type, body)) if err != nil { return nil, err }